You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/15 17:04:26 UTC

[GitHub] [beam] lukecwik commented on a diff in pull request #23196: Improve the performance of TextSource by reducing how many byte[]s are copied (fixes #23193)

lukecwik commented on code in PR #23196:
URL: https://github.com/apache/beam/pull/23196#discussion_r972235373


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java:
##########
@@ -152,131 +160,260 @@ protected void startReading(ReadableByteChannel channel) throws IOException {
           // all the bytes of the delimiter in the call to findDelimiterBounds() below
           requiredPosition = startOffset - delimiter.length;
         }
-        ((SeekableByteChannel) channel).position(requiredPosition);
-        findDelimiterBounds();
-        buffer = buffer.substring(endOfDelimiterInBuffer);
-        startOfNextRecord = requiredPosition + endOfDelimiterInBuffer;
-        endOfDelimiterInBuffer = 0;
-        startOfDelimiterInBuffer = 0;
+
+        // Handle the case where the requiredPosition is at the beginning of the file so we can
+        // skip over UTF8_BOM if present.
+        if (requiredPosition < UTF8_BOM.size()) {
+          ((SeekableByteChannel) channel).position(0);
+          if (fileStartsWithBom()) {
+            startOfNextRecord = bufferPosn = UTF8_BOM.size();
+          } else {
+            startOfNextRecord = bufferPosn = (int) requiredPosition;
+          }
+        } else {
+          ((SeekableByteChannel) channel).position(requiredPosition);
+          startOfNextRecord = requiredPosition;
+        }
+
+        // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point
+        // to the beginning of the next record.
+        readNextRecord();
+        currentValue = null;
+      } else {
+        // Check to see if we start with the UTF_BOM bytes skipping them if present.
+        if (fileStartsWithBom()) {
+          startOfNextRecord = bufferPosn = UTF8_BOM.size();
+        }
+      }
+    }
+
+    private boolean fileStartsWithBom() throws IOException {
+      for (; ; ) {
+        int bytesRead = inChannel.read(byteBuffer);
+        if (bytesRead == -1) {
+          return false;
+        } else {
+          bufferLength += bytesRead;
+        }
+        if (bufferLength >= UTF8_BOM.size()) {
+          int i;
+          for (i = 0; i < UTF8_BOM.size() && buffer[i] == UTF8_BOM.byteAt(i); ++i) {}
+          if (i == UTF8_BOM.size()) {
+            return true;
+          }
+          return false;
+        }
+      }
+    }
+
+    @Override
+    protected boolean readNextRecord() throws IOException {
+      startOfRecord = startOfNextRecord;
+
+      // If we have reached EOF file last time around then we will mark that we don't have an
+      // element and return false.
+      if (eof) {
+        currentValue = null;
+        return false;
+      }
+
+      if (delimiter == null) {
+        return readDefaultLine();
+      } else {
+        return readCustomLine();
       }
     }
 
     /**
-     * Locates the start position and end position of the next delimiter. Will consume the channel
-     * till either EOF or the delimiter bounds are found.
+     * Loosely based upon <a
+     * href="https://github.com/hanborq/hadoop/blob/master/src/core/org/apache/hadoop/util/LineReader.java">Hadoop
+     * LineReader.java</a>
      *
-     * <p>This fills the buffer and updates the positions as follows:
+     * <p>We're reading data from inChannel, but the head of the stream may be already buffered in
+     * buffer, so we have several cases: 1. No newline characters are in the buffer, so we need to
+     * copy everything and read another buffer from the stream. 2. An unambiguously terminated line
+     * is in buffer, so we just create currentValue 3. Ambiguously terminated line is in buffer,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org