You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/09/27 03:31:37 UTC

[beam] branch master updated: Fix TextSource incorrect handling in channels that return short reads. (#23376)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dd1c6bc4a4 Fix TextSource incorrect handling in channels that return short reads. (#23376)
7dd1c6bc4a4 is described below

commit 7dd1c6bc4a453e03bc000710d51e5ece65011eb2
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Sep 26 20:31:30 2022 -0700

    Fix TextSource incorrect handling in channels that return short reads. (#23376)
    
    * Fix TextSource incorrect handling in channels that return short reads.
    
    The issue is that readDefaultLine and readCustomLine was incorrectly calculating the appendLength when the buffer returned was 0 length.
    This was solved by ensuring that the internal read loop always read at least one byte allowing for the code to ensure that we were making progress. For readDefaultLine we kept track of whether we need to skip an LF in the next buffer if the current buffer ended with a CR and for readCustomLine we had to remember how much of the delimiter we have read so far in this buffer.
    
    The bug was introduced in https://github.com/apache/beam/commit/30a48f05cf2ee0eea0a304fea01eb40f323f9f3c
    
    There was no noticeable change in the TextSourceBenchmark performance results.
    
    Fixes #23375
---
 .../java/org/apache/beam/sdk/io/TextSource.java    |  56 +++++++----
 .../org/apache/beam/sdk/io/TextIOReadTest.java     | 111 ++++++++++++++++++++-
 2 files changed, 144 insertions(+), 23 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index e0aadf79fce..c800f50e49d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -109,6 +109,7 @@ class TextSource extends FileBasedSource<String> {
     private volatile @Nullable String currentValue;
     private int bufferLength = 0; // the number of bytes of real data in the buffer
     private int bufferPosn = 0; // the current position in the buffer
+    private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer
 
     private TextBasedReader(TextSource source, byte[] delimiter) {
       super(source);
@@ -249,17 +250,16 @@ class TextSource extends FileBasedSource<String> {
       assert !eof;
 
       int newlineLength = 0; // length of terminating newline
-      boolean prevCharCR = false; // true of prev char was CR
+      boolean prevCharCR = false; // true if prev char was CR
       long bytesConsumed = 0;
+      EOF:
       for (; ; ) {
         int startPosn = bufferPosn; // starting from where we left off the last time
 
-        // Read the next chunk from the file
-        if (bufferPosn == bufferLength) {
+        // Read the next chunk from the file, ensure that we read at least one byte
+        // or reach EOF.
+        while (bufferPosn == bufferLength) {
           startPosn = bufferPosn = 0;
-          if (prevCharCR) {
-            ++bytesConsumed; // account for CR from previous read
-          }
           byteBuffer.clear();
           bufferLength = inChannel.read(byteBuffer);
 
@@ -273,10 +273,18 @@ class TextSource extends FileBasedSource<String> {
             }
 
             currentValue = str.toString(StandardCharsets.UTF_8.name());
-            break;
+            break EOF;
           }
         }
 
+        // Consume any LF after CR if it is the first character of the next buffer
+        if (skipLineFeedAtStart && buffer[bufferPosn] == LF) {
+          ++bytesConsumed;
+          ++startPosn;
+          ++bufferPosn;
+          skipLineFeedAtStart = false;
+        }
+
         // Search for the newline
         for (; bufferPosn < bufferLength; ++bufferPosn) {
           if (buffer[bufferPosn] == LF) {
@@ -291,20 +299,23 @@ class TextSource extends FileBasedSource<String> {
           prevCharCR = (buffer[bufferPosn] == CR);
         }
 
-        int readLength = bufferPosn - startPosn;
-        if (prevCharCR && newlineLength == 0) {
-          --readLength; // CR at the end of the buffer
+        // CR at the end of the buffer
+        if (newlineLength == 0 && prevCharCR) {
+          skipLineFeedAtStart = true;
+          newlineLength = 1;
+        } else {
+          skipLineFeedAtStart = false;
         }
-        bytesConsumed += readLength;
 
+        int readLength = bufferPosn - startPosn;
+        bytesConsumed += readLength;
+        int appendLength = readLength - newlineLength;
         if (newlineLength == 0) {
-          // Append the prefix of the value to str until we find a newline
-          str.write(buffer, startPosn, readLength);
+          // Append the prefix of the value to str skipping the partial delimiter
+          str.write(buffer, startPosn, appendLength);
         } else {
-          int appendLength = readLength - newlineLength;
-
-          // Optimize for the common case where the string is wholly contained within the buffer
           if (str.size() == 0) {
+            // Optimize for the common case where the string is wholly contained within the buffer
             currentValue = new String(buffer, startPosn, appendLength, StandardCharsets.UTF_8);
           } else {
             str.write(buffer, startPosn, appendLength);
@@ -313,6 +324,7 @@ class TextSource extends FileBasedSource<String> {
           break;
         }
       }
+
       startOfNextRecord = startOfRecord + bytesConsumed;
       str.reset();
       return true;
@@ -331,11 +343,13 @@ class TextSource extends FileBasedSource<String> {
 
       long bytesConsumed = 0;
       int delPosn = 0;
+      EOF:
       for (; ; ) {
         int startPosn = bufferPosn; // starting from where we left off the last time
 
-        // Read the next chunk from the file
-        if (bufferPosn >= bufferLength) {
+        // Read the next chunk from the file, ensure that we read at least one byte
+        // or reach EOF.
+        while (bufferPosn >= bufferLength) {
           startPosn = bufferPosn = 0;
           byteBuffer.clear();
           bufferLength = inChannel.read(byteBuffer);
@@ -355,17 +369,17 @@ class TextSource extends FileBasedSource<String> {
             }
 
             currentValue = str.toString(StandardCharsets.UTF_8.name());
-            break; // EOF
+            break EOF;
           }
         }
 
+        int prevDelPosn = delPosn;
         DELIMITER_MATCH:
         {
           if (delPosn > 0) {
             // slow-path: Handle the case where we only matched part of the delimiter, possibly
             // adding that to str fixing up any partially consumed delimiter if we don't match the
             // whole delimiter
-            int prevDelPosn = delPosn;
             for (; bufferPosn < bufferLength; ++bufferPosn) {
               if (buffer[bufferPosn] == delimiter[delPosn]) {
                 delPosn++;
@@ -399,7 +413,7 @@ class TextSource extends FileBasedSource<String> {
 
         int readLength = bufferPosn - startPosn;
         bytesConsumed += readLength;
-        int appendLength = readLength - delPosn;
+        int appendLength = readLength - (delPosn - prevDelPosn);
         if (delPosn < delimiter.length) {
           // Append the prefix of the value to str skipping the partial delimiter
           str.write(buffer, startPosn, appendLength);
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 2e447255b7d..414114ff42a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -43,9 +43,13 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -59,7 +63,9 @@ import java.util.zip.ZipOutputStream;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -337,7 +343,7 @@ public class TextIOReadTest {
 
   /** Tests for reading files with various delimiters. */
   @RunWith(Parameterized.class)
-  public static class ReadWithDelimiterTest {
+  public static class ReadWithDefaultDelimiterTest {
     private static final ImmutableList<String> EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz");
     @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -363,10 +369,59 @@ public class TextIOReadTest {
     public ImmutableList<String> expected;
 
     @Test
-    public void testReadLinesWithDelimiter() throws Exception {
+    public void testReadLinesWithDefaultDelimiter() throws Exception {
       runTestReadWithData(line.getBytes(UTF_8), expected);
     }
 
+    @Test
+    public void testReadLinesWithDefaultDelimiterAndZeroAndOneLengthReturningChannel()
+        throws Exception {
+      Path path = tempFolder.newFile().toPath();
+      Files.write(path, line.getBytes(UTF_8));
+      Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
+      FileBasedSource source =
+          getTextSource(path.toString(), null)
+              .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
+      FileBasedReader<String> reader =
+          source.createSingleFileReader(PipelineOptionsFactory.create());
+      ReadableByteChannel channel =
+          FileSystems.open(
+              FileSystems.matchSingleFileSpec(source.getFileOrPatternSpec()).resourceId());
+      InputStream stream = Channels.newInputStream(channel);
+      reader.startReading(
+          // Placeholder channel that only yields 0- and 1-length buffers.
+          // Data is read at most one byte at a time from line parameter.
+          new ReadableByteChannel() {
+            int readCount = 0;
+
+            @Override
+            public int read(ByteBuffer dst) throws IOException {
+              if (++readCount % 3 == 0) {
+                if (dst.hasRemaining()) {
+                  int value = stream.read();
+                  if (value == -1) {
+                    return -1;
+                  }
+                  dst.put((byte) value);
+                  return 1;
+                }
+              }
+              return 0;
+            }
+
+            @Override
+            public boolean isOpen() {
+              return channel.isOpen();
+            }
+
+            @Override
+            public void close() throws IOException {
+              stream.close();
+            }
+          });
+      assertEquals(expected, SourceTestUtils.readFromStartedReader(reader));
+    }
+
     @Test
     public void testSplittingSource() throws Exception {
       TextSource source = prepareSource(line.getBytes(UTF_8));
@@ -421,6 +476,58 @@ public class TextIOReadTest {
           TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}),
           PipelineOptionsFactory.create());
     }
+
+    @Test
+    public void testReadLinesWithCustomDelimiterAndZeroAndOneLengthReturningChannel()
+        throws Exception {
+      byte[] delimiter = new byte[] {'|', '*'};
+      Path path = tempFolder.newFile().toPath();
+      Files.write(path, testCase.getBytes(UTF_8));
+      Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
+      FileBasedSource source =
+          getTextSource(path.toString(), delimiter)
+              .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
+      FileBasedReader<String> reader =
+          source.createSingleFileReader(PipelineOptionsFactory.create());
+      ReadableByteChannel channel =
+          FileSystems.open(
+              FileSystems.matchSingleFileSpec(source.getFileOrPatternSpec()).resourceId());
+      InputStream stream = Channels.newInputStream(channel);
+      reader.startReading(
+          // Placeholder channel that only yields 0- and 1-length buffers.
+          // Data is read at most one byte at a time from testCase parameter.
+          new ReadableByteChannel() {
+            int readCount = 0;
+
+            @Override
+            public int read(ByteBuffer dst) throws IOException {
+              if (++readCount % 3 == 0) {
+                if (dst.hasRemaining()) {
+                  int value = stream.read();
+                  if (value == -1) {
+                    return -1;
+                  }
+                  dst.put((byte) value);
+                  return 1;
+                }
+              }
+              return 0;
+            }
+
+            @Override
+            public boolean isOpen() {
+              return channel.isOpen();
+            }
+
+            @Override
+            public void close() throws IOException {
+              stream.close();
+            }
+          });
+      assertEquals(
+          SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()),
+          SourceTestUtils.readFromStartedReader(reader));
+    }
   }
 
   /** Tests for some basic operations in {@link TextIO.Read}. */