You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/06/07 13:30:37 UTC

[pulsar] branch branch-2.9 updated: [improve][tiered storage] Reduce cpu usage when offloading the ledger (#15063)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 12ee3f56072 [improve][tiered storage] Reduce cpu usage when offloading the ledger (#15063)
12ee3f56072 is described below

commit 12ee3f56072123e99179015a50df60a7492d6a41
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Jun 6 16:31:14 2022 +0800

    [improve][tiered storage] Reduce cpu usage when offloading the ledger (#15063)
    
    * [imporve][tiered storage] Reduce cpu usage when offloading the ledger
    ---
    
    *Motivation*
    
    When offloading a ledger, the BlockAwareSegmentInputStreamImpl will
    wrap the ledger handler and make it can stream output. Then the JCloud
    will read the stream as the payload and upload to the storage.
    In the JCloud implementation, it read the stream with a buffer
    https://github.com/apache/jclouds/blob/36f351cd18925d2bb27bf7ad2c5d75e555da377a/core/src/main/java/org/jclouds/io/ByteStreams2.java#L68
    
    In the current offload implementation, the read will call multiple times
    to construct the buffer and then return the data.
    After implement the read(byte[] b, int off, int len), the cpu usage reduced
    almost 10%.
    
    *Modifications*
    
    - Add read(byte[] b, int off, int len) implementation in the BlockAwareSegmentInputStreamImpl
    
    (cherry picked from commit 938ab7befc57a23e5a2bcb0f8bfe5c714c4d0018)
---
 .../impl/BlockAwareSegmentInputStreamImpl.java     | 117 +++++++-
 .../impl/BlockAwareSegmentInputStreamTest.java     | 328 +++++++++++++++++++--
 2 files changed, 410 insertions(+), 35 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index a4ffdea6509..b69f9f5e785 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import static com.google.common.base.Preconditions.checkState;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
 import java.io.IOException;
@@ -27,6 +28,7 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -44,6 +46,9 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
     private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
 
     static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD };
+    static final byte[] BLOCK_END_PADDING_BYTES =  Ints.toByteArray(0xFEDCDEAD);
+
+    private final ByteBuf paddingBuf = PulsarByteBufAllocator.DEFAULT.buffer(128, 128);
 
     private final ReadHandle ledger;
     private final long startEntryId;
@@ -65,6 +70,9 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
     static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
     // Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content.
     private List<ByteBuf> entriesByteBuf = null;
+    private int currentOffset = 0;
+    private final AtomicBoolean close = new AtomicBoolean(false);
+
 
     public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) {
         this.ledger = ledger;
@@ -76,6 +84,52 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    private ByteBuf readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()
+            && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
+            // always read from the first ByteBuf in the list, once read all of its content remove it.
+            ByteBuf entryByteBuf = entriesByteBuf.get(0);
+            int readableBytes = entryByteBuf.readableBytes();
+            int read = Math.min(readableBytes, len);
+            ByteBuf buf = entryByteBuf.slice(currentOffset, read);
+            buf.retain();
+            currentOffset += read;
+            entryByteBuf.readerIndex(currentOffset);
+            bytesReadOffset += read;
+
+            if (entryByteBuf.readableBytes() == 0) {
+                entryByteBuf.release();
+                entriesByteBuf.remove(0);
+                blockEntryCount++;
+                currentOffset = 0;
+            }
+
+            return buf;
+        } else {
+            // no space for a new entry or there are no more entries
+            // set data block full, return end padding
+            if (dataBlockFullOffset == blockSize) {
+                dataBlockFullOffset = bytesReadOffset;
+            }
+            paddingBuf.clear();
+            for (int i = 0; i < Math.min(len, paddingBuf.capacity()); i++) {
+                paddingBuf.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset)
+                    % BLOCK_END_PADDING_BYTES.length]);
+            }
+            return paddingBuf.retain();
+        }
+    }
+
     // read ledger entries.
     private int readEntries() throws IOException {
         checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
@@ -143,6 +197,46 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            readLen = Math.min(readLen, blockSize - bytesReadOffset);
+            ByteBuf readEntries = readEntries(readLen);
+            int read = readEntries.readableBytes();
+            readEntries.readBytes(b, offset, read);
+            readEntries.release();
+            readBytes += read;
+            return readBytes;
+        }
+
+        // reached end
+        return -1;
+    }
+
     @Override
     public int read() throws IOException {
         // reading header
@@ -162,11 +256,20 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
 
     @Override
     public void close() throws IOException {
-        super.close();
-        dataBlockHeaderStream.close();
-        if (!entriesByteBuf.isEmpty()) {
-            entriesByteBuf.forEach(buf -> buf.release());
-            entriesByteBuf.clear();
+        // The close method will be triggered twice in the BlobStoreManagedLedgerOffloader#offload method.
+        // The stream resource used by the try-with block which will called the close
+        // And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger
+        // the close method.
+        // So we add the close variable to avoid release paddingBuf twice.
+        if (!close.compareAndSet(false, true)) {
+            super.close();
+            dataBlockHeaderStream.close();
+            if (!entriesByteBuf.isEmpty()) {
+                entriesByteBuf.forEach(buf -> buf.release());
+                entriesByteBuf.clear();
+            }
+            paddingBuf.clear();
+            paddingBuf.release();
         }
     }
 
@@ -185,6 +288,10 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
         return blockSize;
     }
 
+    public int getDataBlockFullOffset() {
+        return dataBlockFullOffset;
+    }
+
     @Override
     public int getBlockEntryCount() {
         return blockEntryCount;
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
index 5cf6bd56500..0cd4bbd70a9 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.fail;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 
@@ -28,6 +29,7 @@ import com.google.common.primitives.Longs;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -44,6 +46,7 @@ import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
@@ -206,8 +209,16 @@ public class BlockAwareSegmentInputStreamTest {
         }
     }
 
-    @Test
-    public void testHaveEndPadding() throws Exception {
+    @DataProvider(name = "useBufferRead")
+    public static Object[][] useBufferRead() {
+        return new Object[][]{
+            {Boolean.TRUE},
+            {Boolean.FALSE}
+        };
+    }
+
+    @Test(dataProvider = "useBufferRead")
+    public void testHaveEndPadding(boolean useBufferRead) throws Exception {
         int ledgerId = 1;
         int entrySize = 8;
         int lac = 160;
@@ -226,7 +237,12 @@ public class BlockAwareSegmentInputStreamTest {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());
+            assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret);
+        } else {
+            ByteStreams.readFully(inputStream, headerB);
+        }
         DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
         assertEquals(headerRead.getBlockLength(), blockSize);
         assertEquals(headerRead.getFirstEntryId(), 0);
@@ -240,9 +256,18 @@ public class BlockAwareSegmentInputStreamTest {
                 byte lengthBuf[] = new byte[4];
                 byte entryIdBuf[] = new byte[8];
                 byte content[] = new byte[entrySize];
-                inputStream.read(lengthBuf);
-                inputStream.read(entryIdBuf);
-                inputStream.read(content);
+                if (useBufferRead) {
+                    int read = inputStream.read(lengthBuf, 0, 4);
+                    assertEquals(read, 4);
+                    read = inputStream.read(entryIdBuf, 0, 8);
+                    assertEquals(read, 8);
+                    read = inputStream.read(content, 0, entrySize);
+                    assertEquals(read, entrySize);
+                } else {
+                    inputStream.read(lengthBuf);
+                    inputStream.read(entryIdBuf);
+                    inputStream.read(content);
+                }
 
                 assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
                 assertEquals(i, Longs.fromByteArray(entryIdBuf));
@@ -256,13 +281,36 @@ public class BlockAwareSegmentInputStreamTest {
         int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() -  expectedEntryCount * (entrySize + 4 + 8);
         assertEquals(left, 5);
         byte padding[] = new byte[left];
-        inputStream.read(padding);
+        if (useBufferRead) {
+            int ret = 0;
+            int offset = 0;
+            while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) {
+                offset += ret;
+            }
+            assertEquals(inputStream.read(padding, 0, padding.length), -1);
+        } else {
+            int len = left;
+            int offset = 0;
+            byte[] buf = new byte[4];
+            while (len > 0) {
+                int ret = inputStream.read(buf);
+                for (int i = 0; i < ret; i++) {
+                    padding[offset++] = buf[i];
+                }
+                len -= ret;
+            }
+        }
         ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
         IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
             assertEquals(Integer.toHexString(paddingBuf.readInt()),
                          Integer.toHexString(0xFEDCDEAD)));
 
         // 4. reach end.
+        if (useBufferRead) {
+            byte[] b = new byte[4];
+            int ret = inputStream.read(b, 0, 4);
+            assertEquals(ret, -1);
+        }
         assertEquals(inputStream.read(), -1);
 
         assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount);
@@ -272,8 +320,8 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.close();
     }
 
-    @Test
-    public void testNoEndPadding() throws Exception {
+    @Test(dataProvider = "useBufferRead")
+    public void testNoEndPadding(boolean useBufferRead) throws Exception {
         int ledgerId = 1;
         int entrySize = 8;
         int lac = 120;
@@ -293,7 +341,12 @@ public class BlockAwareSegmentInputStreamTest {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());
+            assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret);
+        } else {
+            ByteStreams.readFully(inputStream, headerB);
+        }
         DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
         assertEquals(headerRead.getBlockLength(), blockSize);
         assertEquals(headerRead.getFirstEntryId(), 0);
@@ -307,9 +360,18 @@ public class BlockAwareSegmentInputStreamTest {
                 byte lengthBuf[] = new byte[4];
                 byte entryIdBuf[] = new byte[8];
                 byte content[] = new byte[entrySize];
-                inputStream.read(lengthBuf);
-                inputStream.read(entryIdBuf);
-                inputStream.read(content);
+                if (useBufferRead) {
+                    int read = inputStream.read(lengthBuf, 0, 4);
+                    assertEquals(read, 4);
+                    read = inputStream.read(entryIdBuf, 0, 8);
+                    assertEquals(read, 8);
+                    read = inputStream.read(content, 0, entrySize);
+                    assertEquals(read, entrySize);
+                } else {
+                    inputStream.read(lengthBuf);
+                    inputStream.read(entryIdBuf);
+                    inputStream.read(content);
+                }
 
                 assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
                 assertEquals(i, Longs.fromByteArray(entryIdBuf));
@@ -324,6 +386,11 @@ public class BlockAwareSegmentInputStreamTest {
         assertEquals(left, 0);
 
         // 4. reach end.
+        if (useBufferRead) {
+            byte[] b = new byte[4];
+            int ret = inputStream.read(b, 0, 4);
+            assertEquals(ret, -1);
+        }
         assertEquals(inputStream.read(), -1);
 
         assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount);
@@ -333,8 +400,8 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.close();
     }
 
-    @Test
-    public void testReadTillLac() throws Exception {
+    @Test(dataProvider = "useBufferRead")
+    public void testReadTillLac(boolean useBufferRead) throws Exception {
         // simulate last data block read.
         int ledgerId = 1;
         int entrySize = 8;
@@ -354,7 +421,12 @@ public class BlockAwareSegmentInputStreamTest {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());
+            assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret);
+        } else {
+            ByteStreams.readFully(inputStream, headerB);
+        }
         DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
         assertEquals(headerRead.getBlockLength(), blockSize);
         assertEquals(headerRead.getFirstEntryId(), 0);
@@ -368,9 +440,18 @@ public class BlockAwareSegmentInputStreamTest {
                 byte lengthBuf[] = new byte[4];
                 byte entryIdBuf[] = new byte[8];
                 byte content[] = new byte[entrySize];
-                inputStream.read(lengthBuf);
-                inputStream.read(entryIdBuf);
-                inputStream.read(content);
+                if (useBufferRead) {
+                    int read = inputStream.read(lengthBuf, 0, 4);
+                    assertEquals(read, 4);
+                    read = inputStream.read(entryIdBuf, 0, 8);
+                    assertEquals(read, 8);
+                    read = inputStream.read(content, 0, entrySize);
+                    assertEquals(read, entrySize);
+                } else {
+                    inputStream.read(lengthBuf);
+                    inputStream.read(entryIdBuf);
+                    inputStream.read(content);
+                }
 
                 assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
                 assertEquals(i, Longs.fromByteArray(entryIdBuf));
@@ -385,6 +466,11 @@ public class BlockAwareSegmentInputStreamTest {
         assertEquals(left, 0);
 
         // 4. reach end.
+        if (useBufferRead) {
+            byte[] b = new byte[4];
+            int ret = inputStream.read(b, 0, 4);
+            assertEquals(ret, -1);
+        }
         assertEquals(inputStream.read(), -1);
 
         assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount);
@@ -394,8 +480,8 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.close();
     }
 
-    @Test
-    public void testNoEntryPutIn() throws Exception {
+    @Test(dataProvider = "useBufferRead")
+    public void testNoEntryPutIn(boolean useBufferRead) throws Exception {
         // simulate first entry size over the block size budget, it shouldn't be added.
         // 2 entries, each with bigger size than block size, so there should no entry added into block.
         int ledgerId = 1;
@@ -416,7 +502,12 @@ public class BlockAwareSegmentInputStreamTest {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());
+            assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret);
+        } else {
+            ByteStreams.readFully(inputStream, headerB);
+        }
         DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
         assertEquals(headerRead.getBlockLength(), blockSize);
         assertEquals(headerRead.getFirstEntryId(), 0);
@@ -424,13 +515,36 @@ public class BlockAwareSegmentInputStreamTest {
 
         // 2. since no entry put in, it should only get padding after header.
         byte padding[] = new byte[blockSize - DataBlockHeaderImpl.getDataStartOffset()];
-        inputStream.read(padding);
+        if (useBufferRead) {
+            int ret = 0;
+            int offset = 0;
+            while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) {
+                offset += ret;
+            }
+            assertEquals(inputStream.read(padding, 0, padding.length), -1);
+        } else {
+            int len = padding.length;
+            int offset = 0;
+            byte[] buf = new byte[4];
+            while (len > 0) {
+                int ret = inputStream.read(buf);
+                for (int i = 0; i < ret; i++) {
+                    padding[offset++] = buf[i];
+                }
+                len -= ret;
+            }
+        }
         ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
         IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
             assertEquals(Integer.toHexString(paddingBuf.readInt()),
                          Integer.toHexString(0xFEDCDEAD)));
 
         // 3. reach end.
+        if (useBufferRead) {
+            byte[] b = new byte[4];
+            int ret = inputStream.read(b, 0, 4);
+            assertEquals(ret, -1);
+        }
         assertEquals(inputStream.read(), -1);
 
         assertEquals(inputStream.getBlockEntryCount(), 0);
@@ -440,8 +554,8 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.close();
     }
 
-    @Test
-    public void testPaddingOnLastBlock() throws Exception {
+    @Test(dataProvider = "useBufferRead")
+    public void testPaddingOnLastBlock(boolean useBufferRead) throws Exception {
         int ledgerId = 1;
         int entrySize = 1000;
         int lac = 0;
@@ -460,7 +574,12 @@ public class BlockAwareSegmentInputStreamTest {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());
+            assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret);
+        } else {
+            ByteStreams.readFully(inputStream, headerB);
+        }
         DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
         assertEquals(headerRead.getBlockLength(), blockSize);
         assertEquals(headerRead.getFirstEntryId(), 0);
@@ -474,9 +593,18 @@ public class BlockAwareSegmentInputStreamTest {
                 byte lengthBuf[] = new byte[4];
                 byte entryIdBuf[] = new byte[8];
                 byte content[] = new byte[entrySize];
-                inputStream.read(lengthBuf);
-                inputStream.read(entryIdBuf);
-                inputStream.read(content);
+                if (useBufferRead) {
+                    int read = inputStream.read(lengthBuf, 0, 4);
+                    assertEquals(read, 4);
+                    read = inputStream.read(entryIdBuf, 0, 8);
+                    assertEquals(read, 8);
+                    read = inputStream.read(content, 0, entrySize);
+                    assertEquals(read, entrySize);
+                } else {
+                    inputStream.read(lengthBuf);
+                    inputStream.read(entryIdBuf);
+                    inputStream.read(content);
+                }
 
                 assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
                 assertEquals(i, Longs.fromByteArray(entryIdBuf));
@@ -490,13 +618,36 @@ public class BlockAwareSegmentInputStreamTest {
         int consumedBytes = DataBlockHeaderImpl.getDataStartOffset()
             + expectedEntryCount * (entrySize + BlockAwareSegmentInputStreamImpl.ENTRY_HEADER_SIZE);
         byte padding[] = new byte[blockSize - consumedBytes];
-        inputStream.read(padding);
+        if (useBufferRead) {
+            int ret = 0;
+            int offset = 0;
+            while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) {
+                offset += ret;
+            }
+            assertEquals(inputStream.read(padding, 0, padding.length), -1);
+        } else {
+            int len = blockSize - consumedBytes;
+            int offset = 0;
+            byte[] buf = new byte[4];
+            while (len > 0) {
+                int ret = inputStream.read(buf);
+                for (int i = 0; i < ret; i++) {
+                    padding[offset++] = buf[i];
+                }
+                len -= ret;
+            }
+        }
         ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
         IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
                 assertEquals(Integer.toHexString(paddingBuf.readInt()),
                              Integer.toHexString(0xFEDCDEAD)));
 
         // 3. reach end.
+        if (useBufferRead) {
+            byte[] b = new byte[4];
+            int ret = inputStream.read(b, 0, 4);
+            assertEquals(ret, -1);
+        }
         assertEquals(inputStream.read(), -1);
 
         assertEquals(inputStream.getBlockEntryCount(), 1);
@@ -530,4 +681,121 @@ public class BlockAwareSegmentInputStreamTest {
         }
     }
 
+    @Test
+    public void testOnlyNegativeOnEOFWithBufferedRead() throws IOException {
+        int ledgerId = 1;
+        int entrySize = 10000;
+        int lac = 0;
+
+        Random r = new Random(0);
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, () -> (byte)r.nextInt());
+
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2;
+        BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+
+        int bytesRead = 0;
+        int ret;
+        int offset = 0;
+        int resetOffsetCount = 0;
+        byte[] buf = new byte[1024];
+        while ((ret = inputStream.read(buf, offset, buf.length - offset)) > 0) {
+            bytesRead += ret;
+            int currentOffset = offset;
+            offset = (offset + ret) % buf.length;
+            if (offset < currentOffset) {
+                resetOffsetCount++;
+            }
+        }
+        assertEquals(bytesRead, blockSize);
+        assertNotEquals(resetOffsetCount, 0);
+    }
+
+    // This test is for testing the read(byte[] buf, int off, int len) method can work properly
+    // on the offset not 0.
+    @Test
+    public void testReadTillLacWithSmallBuffer() throws Exception {
+        // simulate last data block read.
+        int ledgerId = 1;
+        int entrySize = 8;
+        int lac = 89;
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac);
+
+        // set block size equals to (header + lac_entry) size.
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + (1 + lac) * (entrySize + 4 + 8);
+        BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+        int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8);
+
+        // verify get methods
+        assertEquals(inputStream.getLedger(), readHandle);
+        assertEquals(inputStream.getStartEntryId(), 0);
+        assertEquals(inputStream.getBlockSize(), blockSize);
+
+        // verify read inputStream
+        // 1. read header. 128
+        byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+        // read twice to test the offset not 0 case
+        int ret = inputStream.read(headerB, 0, 66);
+        assertEquals(ret, 66);
+        ret = inputStream.read(headerB, 66, headerB.length - 66);
+        assertEquals(headerB.length - 66, ret);
+        DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
+        assertEquals(headerRead.getBlockLength(), blockSize);
+        assertEquals(headerRead.getFirstEntryId(), 0);
+
+        byte[] entryData = new byte[entrySize];
+        Arrays.fill(entryData, (byte)0xB); // 0xB is MockLedgerEntry.blockPadding
+
+        // 2. read Ledger entries. 96 * 20
+        IntStream.range(0, expectedEntryCount).forEach(i -> {
+            try {
+                byte lengthBuf[] = new byte[4];
+                byte entryIdBuf[] = new byte[8];
+                byte content[] = new byte[entrySize];
+
+                int read = inputStream.read(lengthBuf, 0, 4);
+                assertEquals(read, 4);
+                read = inputStream.read(entryIdBuf, 0, 8);
+                assertEquals(read, 8);
+
+                Random random = new Random(System.currentTimeMillis());
+                int o = 0;
+                int totalRead = 0;
+                int maxReadTime = 10;
+                while (o != content.length) {
+                    int r;
+                    if (maxReadTime-- == 0) {
+                        r = entrySize - o;
+                    } else {
+                        r = random.nextInt(entrySize - o);
+                    }
+                    read = inputStream.read(content, o, r);
+                    totalRead += read;
+                    o += r;
+                }
+                assertEquals(totalRead, entrySize);
+
+                assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
+                assertEquals(i, Longs.fromByteArray(entryIdBuf));
+                assertArrayEquals(entryData, content);
+            } catch (Exception e) {
+                fail("meet exception", e);
+            }
+        });
+
+        // 3. should have no padding
+        int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() -  expectedEntryCount * (entrySize + 4 + 8);
+        assertEquals(left, 0);
+        assertEquals(inputStream.getBlockSize(), inputStream.getDataBlockFullOffset());
+
+        // 4. reach end.
+        byte[] b = new byte[4];
+        ret = inputStream.read(b, 0, 4);
+        assertEquals(ret, -1);
+
+        assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount);
+        assertEquals(inputStream.getBlockEntryBytesCount(), entrySize * expectedEntryCount);
+        assertEquals(inputStream.getEndEntryId(), expectedEntryCount - 1);
+
+        inputStream.close();
+    }
 }