You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/08 09:22:12 UTC

[GitHub] [pulsar] hangc0276 commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

hangc0276 commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845892578


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {

Review Comment:
   add `@NotNull` check



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -76,6 +78,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    private int currentOffset = 0;
+
+    private byte[] readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()

Review Comment:
   Do we need to fill the ByteBuff until the readLen?



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= readLen;

Review Comment:
   +1



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {

Review Comment:
   ```
   if (b == null) {
               throw new NullPointerException();
           } else if (off < 0 || len < 0 || len > b.length - off) {
               throw new IndexOutOfBoundsException();
           } else if (len == 0) {
               return 0;
           }
   ```
   Please add the bound check



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= readLen;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            byte[] readEntries = readEntries(readLen);
+            for (int i = 0; i < readEntries.length; i++) {
+                b[offset + i] = readEntries[i];
+                readBytes++;
+            }
+            return readBytes;
+        }
+
+        // reached end
+        return -1;

Review Comment:
   If we have read part of bytes from dataBlockHeaderStream and then `bytesReadOffset < blockSize` check failed, is it right to just return `-1` ?



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= readLen;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            byte[] readEntries = readEntries(readLen);
+            for (int i = 0; i < readEntries.length; i++) {
+                b[offset + i] = readEntries[i];
+                readBytes++;
+            }
+            return readBytes;
+        }
+
+        // reached end
+        return -1;

Review Comment:
   Please add a test for this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org