You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2021/12/14 06:43:58 UTC
[beam] branch master updated: [BEAM-13015] Allow decoding a set of elements until we hit the block boundary. (#16220)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ffb18c7 [BEAM-13015] Allow decoding a set of elements until we hit the block boundary. (#16220)
ffb18c7 is described below
commit ffb18c79127e85faa9dea7104c5d3e145fdfaf9a
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Mon Dec 13 22:42:03 2021 -0800
[BEAM-13015] Allow decoding a set of elements until we hit the block boundary. (#16220)
This is in preparation for loading blocks at a time into a cache.
---
.../org/apache/beam/sdk/fn/stream/DataStreams.java | 29 +++++++-
.../apache/beam/sdk/fn/stream/DataStreamsTest.java | 84 ++++++++++++++++++----
2 files changed, 97 insertions(+), 16 deletions(-)
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
index a5e485e..52af762 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
@@ -22,7 +22,9 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
@@ -179,6 +181,24 @@ public class DataStreams {
this.inbound = new Inbound();
}
+ public void seekToNextByteString() {
+ inbound.currentStream = inputByteStrings.next().newInput();
+ inbound.position = 0;
+ }
+
+ public List<T> decodeTillAtChunkBoundary() {
+ try {
+ InputStream previousStream = inbound.currentStream;
+ List<T> rvals = new ArrayList<>();
+ while (previousStream == inbound.currentStream && inbound.currentStream.available() != 0) {
+ rvals.add(next());
+ }
+ return rvals;
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
@Override
public boolean isReady() {
try {
@@ -212,9 +232,10 @@ public class DataStreams {
try {
long previousPosition = inbound.position;
+ InputStream previousStream = inbound.currentStream;
T next = coder.decode(inbound);
// Skip one byte if decoding the value consumed 0 bytes.
- if (inbound.position - previousPosition == 0) {
+ if (previousPosition == inbound.position && previousStream == inbound.currentStream) {
checkState(inbound.read() != -1, "Unexpected EOF reached");
}
return next;
@@ -237,7 +258,7 @@ public class DataStreams {
* <p>Closing this input stream has no effect.
*/
private class Inbound extends InputStream {
- private long position;
+ private int position; // Position within the current input stream.
private InputStream currentStream;
public Inbound() {
@@ -256,6 +277,7 @@ public class DataStreams {
return true;
}
currentStream = inputByteStrings.next().newInput();
+ position = 0;
}
return true;
}
@@ -269,6 +291,7 @@ public class DataStreams {
return true;
}
currentStream = inputByteStrings.next().newInput();
+ position = 0;
}
return false;
}
@@ -282,6 +305,7 @@ public class DataStreams {
return -1;
}
currentStream = inputByteStrings.next().newInput();
+ position = 0;
}
position += 1;
return read;
@@ -302,6 +326,7 @@ public class DataStreams {
return bytesRead > 0 ? bytesRead : -1;
}
currentStream = inputByteStrings.next().newInput();
+ position = 0;
}
remainingLen -= read;
}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
index 8d032bc..f3a6379 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.fn.stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -29,7 +31,6 @@ import static org.junit.Assume.assumeTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.coders.Coder;
@@ -80,19 +81,9 @@ public class DataStreamsTest {
@Test
public void testPrefetch() throws Exception {
List<ByteString> encodings = new ArrayList<>();
- {
- ByteString.Output encoding = ByteString.newOutput();
- StringUtf8Coder.of().encode("A", encoding);
- StringUtf8Coder.of().encode("BC", encoding);
- encodings.add(encoding.toByteString());
- }
+ encodings.add(encode("A", "BC"));
encodings.add(ByteString.EMPTY);
- {
- ByteString.Output encoding = ByteString.newOutput();
- StringUtf8Coder.of().encode("DEF", encoding);
- StringUtf8Coder.of().encode("GHIJ", encoding);
- encodings.add(encoding.toByteString());
- }
+ encodings.add(encode("DEF", "GHIJ"));
PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext<ByteString> iterator =
new PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext<>(encodings.iterator());
@@ -126,6 +117,71 @@ public class DataStreamsTest {
assertTrue(decoder.isReady());
}
+ @Test
+ public void testSeekToNextByteString() throws Exception {
+ DataStreamDecoder<String> decoder =
+ new DataStreamDecoder<>(
+ StringUtf8Coder.of(),
+ new PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext<>(
+ Iterators.forArray(
+ encode("A"), encode("B", "C"), encode("D"), encode("E", "F"), encode("G"))));
+
+ // Seek and load the next byte string
+ decoder.seekToNextByteString();
+ assertTrue(decoder.isReady());
+ assertEquals("A", decoder.next());
+ assertEquals("B", decoder.next());
+ // Seek when in the middle of a byte string skipping "C"
+ decoder.seekToNextByteString();
+ assertEquals("D", decoder.next());
+ // Seek when at the end of "D"
+ decoder.seekToNextByteString();
+ // Seek when at the beginning of "E" skipping "E"
+ decoder.seekToNextByteString(); // at the beginning of "E"
+ assertEquals("G", decoder.next());
+ }
+
+ @Test
+ public void testDecodeRemainderInCurrentChunk() throws Exception {
+ ByteString multipleElementsToSplit = encode("B", "BigElementC");
+ ByteString singleElementToSplit = encode("BigElementG");
+ DataStreamDecoder<String> decoder =
+ new DataStreamDecoder<>(
+ StringUtf8Coder.of(),
+ new PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext<>(
+ Iterators.forArray(
+ encode("A"),
+ multipleElementsToSplit.substring(0, multipleElementsToSplit.size() - 1),
+ multipleElementsToSplit.substring(multipleElementsToSplit.size() - 1),
+ encode("D"),
+ encode(),
+ encode("E", "F"),
+ singleElementToSplit.substring(0, singleElementToSplit.size() - 1),
+ singleElementToSplit.substring(singleElementToSplit.size() - 1))));
+
+ decoder.seekToNextByteString();
+ assertThat(decoder.decodeTillAtChunkBoundary(), contains("A"));
+ decoder.seekToNextByteString();
+ assertThat(decoder.decodeTillAtChunkBoundary(), contains("B", "BigElementC"));
+ decoder.seekToNextByteString();
+ assertThat(decoder.decodeTillAtChunkBoundary(), contains("D"));
+ decoder.seekToNextByteString();
+ assertThat(decoder.decodeTillAtChunkBoundary(), is(empty()));
+ decoder.seekToNextByteString();
+ assertThat(decoder.decodeTillAtChunkBoundary(), contains("E", "F"));
+ decoder.seekToNextByteString();
+ assertThat(decoder.decodeTillAtChunkBoundary(), contains("BigElementG"));
+ assertFalse(decoder.hasNext());
+ }
+
+ private ByteString encode(String... values) throws IOException {
+ ByteString.Output out = ByteString.newOutput();
+ for (String value : values) {
+ StringUtf8Coder.of().encode(value, out);
+ }
+ return out.toByteString();
+ }
+
private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException {
ByteString.Output output = ByteString.newOutput();
for (T value : expected) {
@@ -142,7 +198,7 @@ public class DataStreamsTest {
}
private <T> void testDecoderWith(Coder<T> coder, T[] expected, List<ByteString> encoded) {
- Iterator<T> decoder =
+ DataStreamDecoder<T> decoder =
new DataStreamDecoder<>(
coder, PrefetchableIterators.maybePrefetchable(encoded.iterator()));