You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2021/12/14 06:43:58 UTC

[beam] branch master updated: [BEAM-13015] Allow decoding a set of elements until we hit the block boundary. (#16220)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ffb18c7  [BEAM-13015] Allow decoding a set of elements until we hit the block boundary. (#16220)
ffb18c7 is described below

commit ffb18c79127e85faa9dea7104c5d3e145fdfaf9a
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Mon Dec 13 22:42:03 2021 -0800

    [BEAM-13015] Allow decoding a set of elements until we hit the block boundary. (#16220)
    
    This is in preparation for loading blocks at a time into a cache.
---
 .../org/apache/beam/sdk/fn/stream/DataStreams.java | 29 +++++++-
 .../apache/beam/sdk/fn/stream/DataStreamsTest.java | 84 ++++++++++++++++++----
 2 files changed, 97 insertions(+), 16 deletions(-)

diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
index a5e485e..52af762 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
@@ -22,7 +22,9 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
@@ -179,6 +181,24 @@ public class DataStreams {
       this.inbound = new Inbound();
     }
 
+    public void seekToNextByteString() {
+      inbound.currentStream = inputByteStrings.next().newInput();
+      inbound.position = 0;
+    }
+
+    public List<T> decodeTillAtChunkBoundary() {
+      try {
+        InputStream previousStream = inbound.currentStream;
+        List<T> rvals = new ArrayList<>();
+        while (previousStream == inbound.currentStream && inbound.currentStream.available() != 0) {
+          rvals.add(next());
+        }
+        return rvals;
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
     @Override
     public boolean isReady() {
       try {
@@ -212,9 +232,10 @@ public class DataStreams {
 
       try {
         long previousPosition = inbound.position;
+        InputStream previousStream = inbound.currentStream;
         T next = coder.decode(inbound);
         // Skip one byte if decoding the value consumed 0 bytes.
-        if (inbound.position - previousPosition == 0) {
+        if (previousPosition == inbound.position && previousStream == inbound.currentStream) {
           checkState(inbound.read() != -1, "Unexpected EOF reached");
         }
         return next;
@@ -237,7 +258,7 @@ public class DataStreams {
      * <p>Closing this input stream has no effect.
      */
     private class Inbound extends InputStream {
-      private long position;
+      private int position; // Position within the current input stream.
       private InputStream currentStream;
 
       public Inbound() {
@@ -256,6 +277,7 @@ public class DataStreams {
             return true;
           }
           currentStream = inputByteStrings.next().newInput();
+          position = 0;
         }
         return true;
       }
@@ -269,6 +291,7 @@ public class DataStreams {
             return true;
           }
           currentStream = inputByteStrings.next().newInput();
+          position = 0;
         }
         return false;
       }
@@ -282,6 +305,7 @@ public class DataStreams {
             return -1;
           }
           currentStream = inputByteStrings.next().newInput();
+          position = 0;
         }
         position += 1;
         return read;
@@ -302,6 +326,7 @@ public class DataStreams {
               return bytesRead > 0 ? bytesRead : -1;
             }
             currentStream = inputByteStrings.next().newInput();
+            position = 0;
           }
           remainingLen -= read;
         }
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
index 8d032bc..f3a6379 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.fn.stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -29,7 +31,6 @@ import static org.junit.Assume.assumeTrue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.coders.Coder;
@@ -80,19 +81,9 @@ public class DataStreamsTest {
     @Test
     public void testPrefetch() throws Exception {
       List<ByteString> encodings = new ArrayList<>();
-      {
-        ByteString.Output encoding = ByteString.newOutput();
-        StringUtf8Coder.of().encode("A", encoding);
-        StringUtf8Coder.of().encode("BC", encoding);
-        encodings.add(encoding.toByteString());
-      }
+      encodings.add(encode("A", "BC"));
       encodings.add(ByteString.EMPTY);
-      {
-        ByteString.Output encoding = ByteString.newOutput();
-        StringUtf8Coder.of().encode("DEF", encoding);
-        StringUtf8Coder.of().encode("GHIJ", encoding);
-        encodings.add(encoding.toByteString());
-      }
+      encodings.add(encode("DEF", "GHIJ"));
 
       PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext<ByteString> iterator =
           new PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext<>(encodings.iterator());
@@ -126,6 +117,71 @@ public class DataStreamsTest {
       assertTrue(decoder.isReady());
     }
 
+    @Test
+    public void testSeekToNextByteString() throws Exception {
+      DataStreamDecoder<String> decoder =
+          new DataStreamDecoder<>(
+              StringUtf8Coder.of(),
+              new PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext<>(
+                  Iterators.forArray(
+                      encode("A"), encode("B", "C"), encode("D"), encode("E", "F"), encode("G"))));
+
+      // Seek and load the next byte string
+      decoder.seekToNextByteString();
+      assertTrue(decoder.isReady());
+      assertEquals("A", decoder.next());
+      assertEquals("B", decoder.next());
+      // Seek when in the middle of a byte string skipping "C"
+      decoder.seekToNextByteString();
+      assertEquals("D", decoder.next());
+      // Seek when at the end of "D"
+      decoder.seekToNextByteString();
+      // Seek when at the beginning of "E" skipping "E"
+      decoder.seekToNextByteString(); // at the beginning of "E"
+      assertEquals("G", decoder.next());
+    }
+
+    @Test
+    public void testDecodeRemainderInCurrentChunk() throws Exception {
+      ByteString multipleElementsToSplit = encode("B", "BigElementC");
+      ByteString singleElementToSplit = encode("BigElementG");
+      DataStreamDecoder<String> decoder =
+          new DataStreamDecoder<>(
+              StringUtf8Coder.of(),
+              new PrefetchableIteratorsTest.ReadyAfterPrefetchUntilNext<>(
+                  Iterators.forArray(
+                      encode("A"),
+                      multipleElementsToSplit.substring(0, multipleElementsToSplit.size() - 1),
+                      multipleElementsToSplit.substring(multipleElementsToSplit.size() - 1),
+                      encode("D"),
+                      encode(),
+                      encode("E", "F"),
+                      singleElementToSplit.substring(0, singleElementToSplit.size() - 1),
+                      singleElementToSplit.substring(singleElementToSplit.size() - 1))));
+
+      decoder.seekToNextByteString();
+      assertThat(decoder.decodeTillAtChunkBoundary(), contains("A"));
+      decoder.seekToNextByteString();
+      assertThat(decoder.decodeTillAtChunkBoundary(), contains("B", "BigElementC"));
+      decoder.seekToNextByteString();
+      assertThat(decoder.decodeTillAtChunkBoundary(), contains("D"));
+      decoder.seekToNextByteString();
+      assertThat(decoder.decodeTillAtChunkBoundary(), is(empty()));
+      decoder.seekToNextByteString();
+      assertThat(decoder.decodeTillAtChunkBoundary(), contains("E", "F"));
+      decoder.seekToNextByteString();
+      assertThat(decoder.decodeTillAtChunkBoundary(), contains("BigElementG"));
+      assertFalse(decoder.hasNext());
+    }
+
+    private ByteString encode(String... values) throws IOException {
+      ByteString.Output out = ByteString.newOutput();
+      for (String value : values) {
+        StringUtf8Coder.of().encode(value, out);
+      }
+      return out.toByteString();
+    }
+
     private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException {
       ByteString.Output output = ByteString.newOutput();
       for (T value : expected) {
@@ -142,7 +198,7 @@ public class DataStreamsTest {
     }
 
     private <T> void testDecoderWith(Coder<T> coder, T[] expected, List<ByteString> encoded) {
-      Iterator<T> decoder =
+      DataStreamDecoder<T> decoder =
           new DataStreamDecoder<>(
               coder, PrefetchableIterators.maybePrefetchable(encoded.iterator()));