You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/27 02:42:15 UTC

[GitHub] [pulsar] zymap opened a new pull request, #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

zymap opened a new pull request, #15063:
URL: https://github.com/apache/pulsar/pull/15063

   ---
   ### Motivation
   
   
   When offloading a ledger, the BlockAwareSegmentInputStreamImpl will
   wrap the ledger handler and make it can stream output. Then the JCloud
   will read the stream as the payload and upload to the storage.
   In the JCloud implementation, it read the stream with a buffer
   https://github.com/apache/jclouds/blob/36f351cd18925d2bb27bf7ad2c5d75e555da377a/core/src/main/java/org/jclouds/io/ByteStreams2.java#L68
   
   In the current offload implementation, the read will call multiple times
   to construct the buffer and then return the data.
   After implement the read(byte[] b, int off, int len), the cpu usage reduced
   almost 10%.
   
   ### Modifications
   
   - Add read(byte[] b, int off, int len) implementation in the BlockAwareSegmentInputStreamImpl
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `no-need-doc` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-added`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r868937456


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +212,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;

Review Comment:
   ~~Maybe `bytesReadOffset` should not increase, relation: https://github.com/apache/pulsar/pull/15063/files?diff=split&w=0#r868883170~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1110477961

   /pulsarbot run-failure-checks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845872380


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -44,6 +45,7 @@
     private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
 
     static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD };
+    static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD);

Review Comment:
   the code style can be same between line_47 and line_48



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r868870660


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +212,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {

Review Comment:
   Here maybe some problem. If already read data to `b`. But `bytesReadOffset < blockSize), it will return -1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r861197843


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -186,6 +275,10 @@ public void close() throws IOException {
             entriesByteBuf.forEach(buf -> buf.release());
             entriesByteBuf.clear();
         }
+        if (paddingBuf.refCnt() != 0) {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845861127


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= readLen;

Review Comment:
   readLen -= read?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1110479472

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r868937456


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +212,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;

Review Comment:
   Maybe `bytesReadOffset` should not increase, relation: https://github.com/apache/pulsar/pull/15063/files?diff=split&w=0#r868883170



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r849253359


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -76,6 +78,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    private int currentOffset = 0;
+
+    private byte[] readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()
+            && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
+            // always read from the first ByteBuf in the list, once read all of its content remove it.
+            ByteBuf entryByteBuf = entriesByteBuf.get(0);
+            int readableBytes = entryByteBuf.readableBytes();
+            int read = Math.min(readableBytes, len);
+            byte[] buf = new byte[read];

Review Comment:
   It will allocate bytes on heap. Do you have any ideas? @merlimat  @eolivelli @codelipenghui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1121060788

   ping @eolivelli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1111956321

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1111643989

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1139227279

   Hi @zymap 
   
   Sorry I made a mistake just now that the PR was re-opened.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1119169347

   @hangc0276 @horizonzy Could you please take another look as well? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1119169090

   ping @eolivelli @dlg99 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1119211896

   @horizonzy All the required tests are passed. Why do I need to rebase to the latest master?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1119247699

   @horizonzy rebased


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r868937456


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +212,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;

Review Comment:
   ~~Maybe `bytesReadOffset` should not increase, relation: https://github.com/apache/pulsar/pull/15063/files?diff=split&w=0#r86888317~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r861427829


##########
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java:
##########
@@ -226,7 +236,12 @@ public void testHaveEndPadding() throws Exception {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());

Review Comment:
   Added new test to cover this https://github.com/apache/pulsar/pull/15063/files#diff-1f2edfe47e5f4f0d8b1634a21eeaccf78eb36d05c55d1c02f20f146247926defR764



##########
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java:
##########
@@ -226,7 +236,12 @@ public void testHaveEndPadding() throws Exception {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());

Review Comment:
   Added a new test to cover this https://github.com/apache/pulsar/pull/15063/files#diff-1f2edfe47e5f4f0d8b1634a21eeaccf78eb36d05c55d1c02f20f146247926defR764



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r849305637


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -76,6 +78,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    private int currentOffset = 0;
+
+    private byte[] readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()
+            && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
+            // always read from the first ByteBuf in the list, once read all of its content remove it.
+            ByteBuf entryByteBuf = entriesByteBuf.get(0);
+            int readableBytes = entryByteBuf.readableBytes();
+            int read = Math.min(readableBytes, len);
+            byte[] buf = new byte[read];

Review Comment:
   Maybe we can create a bytes array and reuse it to fill the data for every read operation? That would reduce the bytes allocate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1097493054

   @hangc0276 @horizonzy Could you please take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1104788587

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r861428242


##########
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java:
##########
@@ -256,13 +280,35 @@ public void testHaveEndPadding() throws Exception {
         int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() -  expectedEntryCount * (entrySize + 4 + 8);
         assertEquals(left, 5);
         byte padding[] = new byte[left];
-        inputStream.read(padding);
+        if (useBufferRead) {
+            int ret = 0;
+            int offset = 0;
+            while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) {

Review Comment:
   Updated tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1118035781

   @eolivelli @dlg99 Could you please take another look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r868879307


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +212,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {

Review Comment:
   Maybe happen, when the block size < 128, the problem will be happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r869394711


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -87,6 +94,50 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.topicName = ledgerName;
     }
 
+    private ByteBuf readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()
+            && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {

Review Comment:
   ~~`bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize` will make same entry split to two block. It will lead some problem?~~
   It this check every time? maybe it just need check then  entriesByteBuf's 0 element changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845874647


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -76,6 +78,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    private int currentOffset = 0;
+
+    private byte[] readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);

Review Comment:
   does `ENTRIES_PER_READ` change to can be config by user? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845915109


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -44,6 +45,7 @@
     private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
 
     static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD };
+    static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD);

Review Comment:
   That's should fine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r868937456


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +212,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;

Review Comment:
   Maybe `bytesReadOffset` should increase, relation: https://github.com/apache/pulsar/pull/15063/files?diff=split&w=0#r868883170



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r869496534


##########
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java:
##########
@@ -530,4 +683,120 @@ public void testOnlyNegativeOnEOF() throws Exception {
         }
     }
 
+    @Test
+    public void testOnlyNegativeOnEOFWithBufferedRead() throws IOException {
+        int ledgerId = 1;
+        int entrySize = 10000;
+        int lac = 0;
+
+        Random r = new Random(0);
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, () -> (byte)r.nextInt());
+
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2;
+        BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+
+        int bytesRead = 0;
+        int ret;
+        int offset = 0;
+        int resetOffsetCount = 0;
+        byte[] buf = new byte[1024];
+        while ((ret = inputStream.read(buf, offset, buf.length - offset)) > 0) {
+            bytesRead += ret;
+            int currentOffset = offset;
+            offset = (offset + ret) % buf.length;
+            if (offset < currentOffset) {
+                resetOffsetCount++;
+            }
+        }
+        assertEquals(bytesRead, blockSize);
+        assertNotEquals(resetOffsetCount, 0);
+    }
+
+    // This test is for testing the read(byte[] buf, int off, int len) method can work properly
+    // on the offset not 0.
+    @Test
+    public void testReadTillLacWithSmallBuffer() throws Exception {
+        // simulate last data block read.
+        int ledgerId = 1;
+        int entrySize = 8;
+        int lac = 89;
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac);
+
+        // set block size equals to (header + lac_entry) size.
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + (1 + lac) * (entrySize + 4 + 8);
+        BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+        int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8);
+
+        // verify get methods
+        assertEquals(inputStream.getLedger(), readHandle);
+        assertEquals(inputStream.getStartEntryId(), 0);
+        assertEquals(inputStream.getBlockSize(), blockSize);
+
+        // verify read inputStream
+        // 1. read header. 128
+        byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+        // read twice to test the offset not 0 case
+        int ret = inputStream.read(headerB, 0, 66);
+        assertEquals(ret, 66);
+        ret = inputStream.read(headerB, 66, headerB.length - 66);
+        assertEquals(headerB.length - 66, ret);
+        DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
+        assertEquals(headerRead.getBlockLength(), blockSize);
+        assertEquals(headerRead.getFirstEntryId(), 0);
+
+        byte[] entryData = new byte[entrySize];
+        Arrays.fill(entryData, (byte)0xB); // 0xB is MockLedgerEntry.blockPadding
+
+        // 2. read Ledger entries. 96 * 20
+        IntStream.range(0, expectedEntryCount).forEach(i -> {
+            try {
+                byte lengthBuf[] = new byte[4];
+                byte entryIdBuf[] = new byte[8];
+                byte content[] = new byte[entrySize];
+
+                int read = inputStream.read(lengthBuf, 0, 4);
+                assertEquals(read, 4);
+                read = inputStream.read(entryIdBuf, 0, 8);
+                assertEquals(read, 8);
+
+                Random random = new Random(System.currentTimeMillis());
+                int o = 0;
+                int totalRead = 0;
+                int maxReadTime = 10;
+                while (o != content.length) {
+                    int r;
+                    if (maxReadTime-- == 0) {
+                        r = entrySize - o;
+                    } else {
+                        r = random.nextInt(entrySize - o);
+                    }
+                    read = inputStream.read(content, o, r);
+                    totalRead += read;
+                    o += r;
+                }
+                assertEquals(totalRead, entrySize);
+
+                assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
+                assertEquals(i, Longs.fromByteArray(entryIdBuf));
+                assertArrayEquals(entryData, content);
+            } catch (Exception e) {
+                fail("meet exception", e);
+            }
+        });
+
+        // 3. should have no padding
+        int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() -  expectedEntryCount * (entrySize + 4 + 8);

Review Comment:
   It only calculate conversely, maybe should check BlockAwareSegmentInputStreamImpl `dataBlockFullOffset == blockSize`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r869787607


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +212,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {

Review Comment:
   The Header max size is 128. We have a block size limitation for that in the configuration `Max block size in bytes. (64MB by default, 5MB minimum) It looks that is an unnecessary check I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r861426866


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +211,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1119190999

   > @hangc0276 @horizonzy Could you please take another look as well? Thanks
   
   fine, could you rebase master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1109217240

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r860669252


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +211,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();

Review Comment:
   is it worth to report the values, like IndexOutOfBoundsException("off "+ off +", len="+len+", b.length="+b.length)



##########
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java:
##########
@@ -226,7 +236,12 @@ public void testHaveEndPadding() throws Exception {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());

Review Comment:
   do we have a test in which the offset is not 0 ?
   the risk is that is works only with 0 (and we are not handling well the offset)



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -186,6 +275,10 @@ public void close() throws IOException {
             entriesByteBuf.forEach(buf -> buf.release());
             entriesByteBuf.clear();
         }
+        if (paddingBuf.refCnt() != 0) {

Review Comment:
   this is very dangerous.
   we are blindly decreasing the ref count
   we should decrease the ref count only if we incremented it and the same number of times



##########
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java:
##########
@@ -256,13 +280,35 @@ public void testHaveEndPadding() throws Exception {
         int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() -  expectedEntryCount * (entrySize + 4 + 8);
         assertEquals(left, 5);
         byte padding[] = new byte[left];
-        inputStream.read(padding);
+        if (useBufferRead) {
+            int ret = 0;
+            int offset = 0;
+            while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) {

Review Comment:
   is it possible that this returns 0 ?
   -1 means EOF, but 0 means that there is a dummy read that read nothing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r861197591


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +211,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();

Review Comment:
   IllegalArgumentExcepton? otherwise can as well skip this and let the code below throw NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1107964974

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845940276


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            byte[] readEntries = readEntries(readLen);

Review Comment:
   Why do we need to check that? The padding is part of the block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845924317


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= readLen;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            byte[] readEntries = readEntries(readLen);
+            for (int i = 0; i < readEntries.length; i++) {
+                b[offset + i] = readEntries[i];
+                readBytes++;
+            }
+            return readBytes;
+        }
+
+        // reached end
+        return -1;

Review Comment:
   If we haven't done the dataBlockHeaderStream read, it will read until all the header information, it won't continue to read the body.
   
   All the current test is to read the header first, then read the body.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r868875737


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +212,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {

Review Comment:
   ~~My fault, it won't be happen, ignore.~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r869455058


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -87,6 +94,50 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.topicName = ledgerName;
     }
 
+    private ByteBuf readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()
+            && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
+            // always read from the first ByteBuf in the list, once read all of its content remove it.
+            ByteBuf entryByteBuf = entriesByteBuf.get(0);
+            int readableBytes = entryByteBuf.readableBytes();
+            int read = Math.min(readableBytes, len);
+            ByteBuf buf = entryByteBuf.slice(currentOffset, read);
+            buf.retain();
+            currentOffset += read;
+            entryByteBuf.readerIndex(currentOffset);
+            bytesReadOffset += read;
+
+            if (entryByteBuf.readableBytes() == 0) {
+                entryByteBuf.release();
+                entriesByteBuf.remove(0);
+                blockEntryCount++;
+                currentOffset = 0;
+            }
+
+            return buf;
+        } else {
+            // no space for a new entry or there are no more entries
+            // set data block full, return end padding
+            if (dataBlockFullOffset == blockSize) {

Review Comment:
   Discuss: When there is not more entries to read, will fill up padding, it just 1 byte one by one to fill, it looks back the origin logicment. Shall we need support multi level buffer to make it faster and reduce invoke times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845892578


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {

Review Comment:
   add `@NotNull` check



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -76,6 +78,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    private int currentOffset = 0;
+
+    private byte[] readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()

Review Comment:
   Do we need to fill the ByteBuff until the readLen?



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= readLen;

Review Comment:
   +1



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {

Review Comment:
   ```
   if (b == null) {
               throw new NullPointerException();
           } else if (off < 0 || len < 0 || len > b.length - off) {
               throw new IndexOutOfBoundsException();
           } else if (len == 0) {
               return 0;
           }
   ```
   Please add the bound check



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= readLen;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            byte[] readEntries = readEntries(readLen);
+            for (int i = 0; i < readEntries.length; i++) {
+                b[offset + i] = readEntries[i];
+                readBytes++;
+            }
+            return readBytes;
+        }
+
+        // reached end
+        return -1;

Review Comment:
   If we have read part of bytes from dataBlockHeaderStream and then `bytesReadOffset < blockSize` check failed, is it right to just return `-1` ?



##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= readLen;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            byte[] readEntries = readEntries(readLen);
+            for (int i = 0; i < readEntries.length; i++) {
+                b[offset + i] = readEntries[i];
+                readBytes++;
+            }
+            return readBytes;
+        }
+
+        // reached end
+        return -1;

Review Comment:
   Please add a test for this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845939865


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -76,6 +78,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    private int currentOffset = 0;
+
+    private byte[] readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()

Review Comment:
   That would introduce a more complex reading process. We can depend on the original logic to switch the buffer. Because it is a stream, the reader always needs to check the returned value to make sure the stream is read to the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap merged pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap merged PR #15063:
URL: https://github.com/apache/pulsar/pull/15063


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1108298714

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1091306259

   Before the change: 
   <img width="1677" alt="Screen Shot 2022-04-07 at 15 47 58" src="https://user-images.githubusercontent.com/24502569/162155063-d16edc24-513b-4708-a102-5760779e0f4c.png">
   
   After the change:
   <img width="1644" alt="Screen Shot 2022-04-07 at 15 47 42" src="https://user-images.githubusercontent.com/24502569/162155529-d45e2d58-be99-4856-bbc0-dcb619fae09d.png">
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1119212409

   > @horizonzy All the required tests are passed. Why do I need to rebase to the latest master?
   
   https://github.com/apache/pulsar/pull/15063/checks?check_run_id=6224983508


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r861428242


##########
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java:
##########
@@ -256,13 +280,35 @@ public void testHaveEndPadding() throws Exception {
         int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() -  expectedEntryCount * (entrySize + 4 + 8);
         assertEquals(left, 5);
         byte padding[] = new byte[left];
-        inputStream.read(padding);
+        if (useBufferRead) {
+            int ret = 0;
+            int offset = 0;
+            while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) {

Review Comment:
   Updated tests. I asserted the returned value by another read. https://github.com/apache/pulsar/pull/15063/commits/fcb7fd4fc2bcc602edbabc844dff6e72c936d5b2#diff-1f2edfe47e5f4f0d8b1634a21eeaccf78eb36d05c55d1c02f20f146247926defR293



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1112947173

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r861427829


##########
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java:
##########
@@ -226,7 +236,12 @@ public void testHaveEndPadding() throws Exception {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());

Review Comment:
   Added new test to cover this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r869394711


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -87,6 +94,50 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.topicName = ledgerName;
     }
 
+    private ByteBuf readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()
+            && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {

Review Comment:
   `bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize` will make same entry split to two block. It will lead some problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r869455058


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -87,6 +94,50 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.topicName = ledgerName;
     }
 
+    private ByteBuf readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()
+            && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
+            // always read from the first ByteBuf in the list, once read all of its content remove it.
+            ByteBuf entryByteBuf = entriesByteBuf.get(0);
+            int readableBytes = entryByteBuf.readableBytes();
+            int read = Math.min(readableBytes, len);
+            ByteBuf buf = entryByteBuf.slice(currentOffset, read);
+            buf.retain();
+            currentOffset += read;
+            entryByteBuf.readerIndex(currentOffset);
+            bytesReadOffset += read;
+
+            if (entryByteBuf.readableBytes() == 0) {
+                entryByteBuf.release();
+                entriesByteBuf.remove(0);
+                blockEntryCount++;
+                currentOffset = 0;
+            }
+
+            return buf;
+        } else {
+            // no space for a new entry or there are no more entries
+            // set data block full, return end padding
+            if (dataBlockFullOffset == blockSize) {

Review Comment:
   Discuss: When there is not more entries to read, will fill up padding, it just 1 byte one by one to fill, we need support multi level buffer to make it faster?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r869467009


##########
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java:
##########
@@ -206,8 +209,18 @@ public CompletableFuture<Void> closeAsync() {
         }
     }
 
+    @DataProvider(name = "useBufferRead")
+    public static Object[][] useBufferRead() {
+        return new Object[][]{
+            {Boolean.TRUE},
+            {Boolean.FALSE}
+        };
+    }
+
+//    @Test(dataProvider = "useBufferRead")

Review Comment:
   It looks like forget to revert it after test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1122629053

   Add a test for hybrid read way (batch and 1 byte), it should work well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao closed pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
mattisonchao closed pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger
URL: https://github.com/apache/pulsar/pull/15063


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r847853847


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= readLen;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            byte[] readEntries = readEntries(readLen);
+            for (int i = 0; i < readEntries.length; i++) {
+                b[offset + i] = readEntries[i];
+                readBytes++;
+            }
+            return readBytes;
+        }
+
+        // reached end
+        return -1;

Review Comment:
   Yes. Here has a test:  https://github.com/apache/pulsar/pull/15063/files#diff-1f2edfe47e5f4f0d8b1634a21eeaccf78eb36d05c55d1c02f20f146247926defR681



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845915738


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -76,6 +78,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    private int currentOffset = 0;
+
+    private byte[] readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);

Review Comment:
   No. We can make it configurable using another PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r845921158


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            byte[] readEntries = readEntries(readLen);

Review Comment:
   here should be check the result is BLOCK_END_PADDING_BYTES?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r846008392


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -143,6 +191,41 @@ private int readEntries() throws IOException {
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= readLen;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            byte[] readEntries = readEntries(readLen);
+            for (int i = 0; i < readEntries.length; i++) {
+                b[offset + i] = readEntries[i];
+                readBytes++;
+            }
+            return readBytes;
+        }
+
+        // reached end
+        return -1;

Review Comment:
   If the required length is 1024, and the header is 512, it will continue the read the body to fill the rest 512 bytes, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r861427743


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -186,6 +275,10 @@ public void close() throws IOException {
             entriesByteBuf.forEach(buf -> buf.release());
             entriesByteBuf.clear();
         }
+        if (paddingBuf.refCnt() != 0) {

Review Comment:
   My bad. That is a test code and I push it accidentally. The root reason is the close method called twice. 
   The close method will be triggered twice in the BlobStoreManagedLedgerOffloader#offload method. The stream resource used by the try-with block will be called the close. And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger the close method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r861428437


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +211,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();

Review Comment:
   I think throw NPE is ok as well? I add information in the exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r868875737


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +212,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {

Review Comment:
   My fault, it won't be happen, ignore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r869394711


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -87,6 +94,50 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
         this.topicName = ledgerName;
     }
 
+    private ByteBuf readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()
+            && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {

Review Comment:
   Needn't check every time, maybe it just need check then  entriesByteBuf's 0 element changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r868883170


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java:
##########
@@ -161,6 +212,45 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {

Review Comment:
   And shall we need this logicment, if blockSize < headerSize at initial. When read header stream, the bytesReadOffset will be greater than blockSize. It won't to read entry data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15063: [improve][tiered storage] Reduce cpu usage when offloading the ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#issuecomment-1139171036

   👍


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org