You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2024/02/07 18:33:06 UTC

(bookkeeper) branch master updated: Fix checksum calculation bug when the payload is a CompositeByteBuf with readerIndex > 0 (#4196)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c373f7e7b Fix checksum calculation bug when the payload is a CompositeByteBuf with readerIndex > 0 (#4196)
9c373f7e7b is described below

commit 9c373f7e7b5b62d57dc5d295039229c2461ae518
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Feb 7 10:33:00 2024 -0800

    Fix checksum calculation bug when the payload is a CompositeByteBuf with readerIndex > 0 (#4196)
    
    * Add a test that reproduces a bug in checksum calculation
    
    * Revert "Fixed unnecessary copy to heap (#2701)" changes to ByteBufList
    
    This partially reverts commit 3c9c7102538909fd3764ea7314e7618d6d9458fd.
    
    * Remove CompositeBuffer unwrapping in DigestManager
    
    * Rename update -> internalUpdate so that unwrapping logic could be added to update
    
    * Remove unnecessary unwrapping logic in Java9IntHash
    
    * Add safe way to handle CompositeByteBuf
    
    * Add license header
    
    * Fix checkstyle
    
    * Refactor ByteBuf visitor solution
    
    * Fix checkstyle
    
    * Reformat
    
    * Refactor recursive visiting
    
    * Revisit equals, hashCode and toString
    
    * Refactor test case
    
    * Add support for UnpooledHeapByteBuf.getBytes which passes an array
    
    * Add support for visiting buffers backed by byte[] arrays
    
    - getBytes calls setBytes with a byte[] argument for
      heap ByteBufs
    
    * Move ByteBufVisitor to org.apache.bookkeeper.util package
    
    * Update javadoc
    
    * Refactor to use stateless visitor so that instance can be shared
    
    * Improve test so that a single scenario can be used for debugging
    
    * Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == ~crc32c_update(~x)
    
    - Java9IntHash uses private methods from java.util.zip.CRC32C class,
      updateBytes and updateDirectByteBuffer.
      When inspecting the use and interface contract, it doesn't match
      how it is used in Java9IntHash. This PR addresses that by introducing
      a separate initial value for initializing the accumulated value
      so that the initial value could match the logic in
      java.util.zip.CRC32C.reset method. There's also a separate
      method for finalizing the accumulated value into a final
      checksum value. This is to match the java.util.zip.CRC32C.getValue
      method's logic (uses bitwise complement operator ~).
    
    - With a quick glance, it might appear that the previous logic is similar.
      However it isn't since I have a failing test which gets fixed with this
      change. I haven't yet added the Java9IntHash level unit test case to prove how
      it differs. It must be related to integer value overflow. For the CRC32C function,
      I believe it means that it cannot be assumed in all cases that
      func(x) == ~func(~x). That's the assumption that the previous code was making.
      It probably applies for many inputs, but not all. It would break in overflow
      cases.
    
    * Fix checkstyle
    
    * Fix checkstyle
    
    * Fix missing depth increment that prevents StackOverflowException
    
    * Properly handle the depth increase and decrease
    
    * Remove unnecessary condition
    
    * Use more efficient way to read bytes to the target array
    
    * Don't use ByteBufVisitor if it's not necessary
    
    * Revert "Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == ~crc32c_update(~x)"
    
    This reverts commit 272e962930a31cbc237c5e7c0bd0c93213520ba4.
    
    * Fix issue in resume byte[] version that was added
    
    - input and output should be complemented. explanation has been added to the
      resume ByteBuf method
    
    * Polish ByteBufVisitor
    
    - reuse GetBytesCallbackByteBuf instance for handling the root ByteBuf instance
    
    * Use extracted method
    
    * Fix bug with array handling
    
    * Polish ByteBufVisitor
    
    * Optimize the buffer copying in the case where array or memory address cannot be accessed
    
    - read-only buffers will need to be copied before reading
      - use ByteBuf.copy for direct buffers with pooled allocator when the algorithm can accept
        a memory address buffer
    - use the 64kB threadlocal byte[] buffer for copying all other inputs
    
    * Check if memory address is accepted
    
    * Improve comments about complement (current = ~current) in resume
    
    * Print thread dump when build is cancelled
    
    * Filter empty buffers and arrays in ByteBufVisitor
---
 .github/workflows/bk-ci.yml                        |    4 +
 .../proto/checksum/CRC32CDigestManager.java        |   12 +-
 .../proto/checksum/CRC32DigestManager.java         |   14 +-
 .../bookkeeper/proto/checksum/DigestManager.java   |   94 +-
 .../proto/checksum/DirectMemoryCRC32Digest.java    |   18 +-
 .../proto/checksum/DummyDigestManager.java         |   12 +-
 .../proto/checksum/MacDigestManager.java           |   13 +-
 .../proto/checksum/StandardCRC32Digest.java        |    5 +
 .../org/apache/bookkeeper/util/ByteBufList.java    |   36 +-
 .../org/apache/bookkeeper/util/ByteBufVisitor.java | 1132 ++++++++++++++++++++
 .../CompositeByteBufUnwrapBugReproduceTest.java    |  280 +++++
 .../apache/bookkeeper/util/ByteBufListTest.java    |   34 -
 .../circe/checksum/Crc32cIntChecksum.java          |   24 +-
 .../com/scurrilous/circe/checksum/IntHash.java     |    4 +
 .../scurrilous/circe/checksum/Java8IntHash.java    |   10 +
 .../scurrilous/circe/checksum/Java9IntHash.java    |   48 +-
 .../com/scurrilous/circe/checksum/JniIntHash.java  |   10 +
 17 files changed, 1618 insertions(+), 132 deletions(-)

diff --git a/.github/workflows/bk-ci.yml b/.github/workflows/bk-ci.yml
index e8d7782114..c9f9226dd8 100644
--- a/.github/workflows/bk-ci.yml
+++ b/.github/workflows/bk-ci.yml
@@ -200,6 +200,10 @@ jobs:
           path: surefire-reports
           retention-days: 7
 
+      - name: print JVM thread dumps when cancelled
+        if: cancelled()
+        run: ./dev/ci-tool print_thread_dumps
+
   integration-tests:
     name: Integration Tests
     runs-on: ubuntu-latest
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
index 1fb3d47b3e..5343357198 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
@@ -46,7 +46,17 @@ class CRC32CDigestManager extends DigestManager {
     }
 
     @Override
-    int update(int digest, ByteBuf data, int offset, int len) {
+    int internalUpdate(int digest, ByteBuf data, int offset, int len) {
         return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
     }
+
+    @Override
+    int internalUpdate(int digest, byte[] buffer, int offset, int len) {
+        return Crc32cIntChecksum.resumeChecksum(digest, buffer, offset, len);
+    }
+
+    @Override
+    boolean acceptsMemoryAddressBuffer() {
+        return Crc32cIntChecksum.acceptsMemoryAddressBuffer();
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
index 21be2651a7..0d18312cfc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
@@ -34,6 +34,7 @@ class CRC32DigestManager extends DigestManager {
         long getValueAndReset();
 
         void update(ByteBuf buf, int offset,  int len);
+        void update(byte[] buffer, int offset, int len);
     }
 
     private static final FastThreadLocal<CRC32Digest> crc = new FastThreadLocal<CRC32Digest>() {
@@ -62,14 +63,25 @@ class CRC32DigestManager extends DigestManager {
     }
 
     @Override
-    int update(int digest, ByteBuf data, int offset, int len) {
+    int internalUpdate(int digest, ByteBuf data, int offset, int len) {
         crc.get().update(data, offset, len);
         return 0;
     }
 
+    @Override
+    int internalUpdate(int digest, byte[] buffer, int offset, int len) {
+        crc.get().update(buffer, offset, len);
+        return 0;
+    }
+
     @Override
     boolean isInt32Digest() {
         // This is stored as 8 bytes
         return false;
     }
+
+    @Override
+    boolean acceptsMemoryAddressBuffer() {
+        return DirectMemoryCRC32Digest.isSupported();
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index eab33945b1..1e78e4075e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -20,10 +20,8 @@ package org.apache.bookkeeper.proto.checksum;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
-import io.netty.util.ReferenceCountUtil;
 import io.netty.util.ReferenceCounted;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.security.GeneralSecurityException;
@@ -34,6 +32,7 @@ import org.apache.bookkeeper.proto.BookieProtoEncoding;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
 import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.ByteBufVisitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,10 +52,25 @@ public abstract class DigestManager {
     final long ledgerId;
     final boolean useV2Protocol;
     private final ByteBufAllocator allocator;
+    private final DigestUpdaterByteBufVisitorCallback byteBufVisitorCallback;
 
     abstract int getMacCodeLength();
 
-    abstract int update(int digest, ByteBuf buffer, int offset, int len);
+    abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len);
+
+    abstract int internalUpdate(int digest, byte[] buffer, int offset, int len);
+
+    final int update(int digest, ByteBuf buffer, int offset, int len) {
+        if (buffer.hasMemoryAddress() && acceptsMemoryAddressBuffer()) {
+            return internalUpdate(digest, buffer, offset, len);
+        } else if (buffer.hasArray()) {
+            return internalUpdate(digest, buffer.array(), buffer.arrayOffset() + offset, len);
+        } else {
+            UpdateContext updateContext = new UpdateContext(digest);
+            ByteBufVisitor.visitBuffers(buffer, offset, len, byteBufVisitorCallback, updateContext);
+            return updateContext.digest;
+        }
+    }
 
     abstract void populateValueAndReset(int digest, ByteBuf buffer);
 
@@ -69,6 +83,7 @@ public abstract class DigestManager {
         this.useV2Protocol = useV2Protocol;
         this.macCodeLength = getMacCodeLength();
         this.allocator = allocator;
+        this.byteBufVisitorCallback = new DigestUpdaterByteBufVisitorCallback();
     }
 
     public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
@@ -136,22 +151,7 @@ public abstract class DigestManager {
 
         // Compute checksum over the headers
         int digest = update(0, buf, buf.readerIndex(), buf.readableBytes());
-
-        // don't unwrap slices
-        final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
-                ? data.unwrap() : data;
-        ReferenceCountUtil.retain(unwrapped);
-        ReferenceCountUtil.safeRelease(data);
-
-        if (unwrapped instanceof CompositeByteBuf) {
-            CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
-            for (int i = 0; i < cbb.numComponents(); i++) {
-                ByteBuf b = cbb.component(i);
-                digest = update(digest, b, b.readerIndex(), b.readableBytes());
-            }
-        } else {
-            digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
-        }
+        digest = update(digest, data, data.readerIndex(), data.readableBytes());
 
         populateValueAndReset(digest, buf);
 
@@ -159,11 +159,11 @@ public abstract class DigestManager {
         buf.readerIndex(0);
 
         if (isSmallEntry) {
-            buf.writeBytes(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
-            unwrapped.release();
+            buf.writeBytes(data, data.readerIndex(), data.readableBytes());
+            data.release();
             return buf;
         } else {
-            return ByteBufList.get(buf, unwrapped);
+            return ByteBufList.get(buf, data);
         }
     }
 
@@ -176,25 +176,9 @@ public abstract class DigestManager {
         headersBuffer.writeLong(length);
 
         int digest = update(0, headersBuffer, 0, METADATA_LENGTH);
-
-        // don't unwrap slices
-        final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
-                ? data.unwrap() : data;
-        ReferenceCountUtil.retain(unwrapped);
-        ReferenceCountUtil.release(data);
-
-        if (unwrapped instanceof CompositeByteBuf) {
-            CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped);
-            for (int i = 0; i < cbb.numComponents(); i++) {
-                ByteBuf b = cbb.component(i);
-                digest = update(digest, b, b.readerIndex(), b.readableBytes());
-            }
-        } else {
-            digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
-        }
+        digest = update(digest, data, data.readerIndex(), data.readableBytes());
         populateValueAndReset(digest, headersBuffer);
-
-        return ByteBufList.get(headersBuffer, unwrapped);
+        return ByteBufList.get(headersBuffer, data);
     }
 
     /**
@@ -373,4 +357,34 @@ public abstract class DigestManager {
         long length = dataReceived.readLong();
         return new RecoveryData(lastAddConfirmed, length);
     }
+
+    private static class UpdateContext {
+        int digest;
+
+        UpdateContext(int digest) {
+            this.digest = digest;
+        }
+    }
+
+    private class DigestUpdaterByteBufVisitorCallback implements ByteBufVisitor.ByteBufVisitorCallback<UpdateContext> {
+
+        @Override
+        public void visitBuffer(UpdateContext context, ByteBuf visitBuffer, int visitIndex, int visitLength) {
+            // recursively visit the sub buffer and update the digest
+            context.digest = internalUpdate(context.digest, visitBuffer, visitIndex, visitLength);
+        }
+
+        @Override
+        public void visitArray(UpdateContext context, byte[] visitArray, int visitIndex, int visitLength) {
+            // update the digest with the array
+            context.digest = internalUpdate(context.digest, visitArray, visitIndex, visitLength);
+        }
+
+        @Override
+        public boolean acceptsMemoryAddress(UpdateContext context) {
+            return DigestManager.this.acceptsMemoryAddressBuffer();
+        }
+    }
+
+    abstract boolean acceptsMemoryAddressBuffer();
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
index eda223ef7f..07a2bdf464 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
@@ -50,18 +50,32 @@ class DirectMemoryCRC32Digest implements CRC32Digest {
                 crcValue = (int) updateByteBuffer.invoke(null, crcValue, buf.memoryAddress(), index, length);
             } else if (buf.hasArray()) {
                 // Use the internal method to update from array based
-                crcValue = (int) updateBytes.invoke(null, crcValue, buf.array(), buf.arrayOffset() + index, length);
+                crcValue = updateArray(crcValue, buf.array(), buf.arrayOffset() + index, length);
             } else {
                 // Fallback to data copy if buffer is not contiguous
                 byte[] b = new byte[length];
                 buf.getBytes(index, b, 0, length);
-                crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, b.length);
+                crcValue = updateArray(crcValue, b, 0, length);
             }
         } catch (IllegalAccessException | InvocationTargetException e) {
             throw new RuntimeException(e);
         }
     }
 
+    private static int updateArray(int crcValue, byte[] buf, int offset, int length)
+            throws IllegalAccessException, InvocationTargetException {
+        return (int) updateBytes.invoke(null, crcValue, buf, offset, length);
+    }
+
+    @Override
+    public void update(byte[] buffer, int offset, int len) {
+        try {
+            crcValue = updateArray(crcValue, buffer, offset, len);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private static final Method updateByteBuffer;
     private static final Method updateBytes;
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
index b15499f0cc..e2fff9bd7c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
@@ -38,7 +38,12 @@ public class DummyDigestManager extends DigestManager {
     }
 
     @Override
-    int update(int digest, ByteBuf buffer, int offset, int len) {
+    int internalUpdate(int digest, ByteBuf buffer, int offset, int len) {
+        return 0;
+    }
+
+    @Override
+    int internalUpdate(int digest, byte[] buffer, int offset, int len) {
         return 0;
     }
 
@@ -49,4 +54,9 @@ public class DummyDigestManager extends DigestManager {
     boolean isInt32Digest() {
         return false;
     }
+
+    @Override
+    boolean acceptsMemoryAddressBuffer() {
+        return false;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
index c04c411c6c..f9fda5a531 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
@@ -96,13 +96,24 @@ public class MacDigestManager extends DigestManager {
     }
 
     @Override
-    int update(int digest, ByteBuf data, int offset, int len) {
+    int internalUpdate(int digest, ByteBuf data, int offset, int len) {
         mac.get().update(data.slice(offset, len).nioBuffer());
         return 0;
     }
 
+    @Override
+    int internalUpdate(int digest, byte[] buffer, int offset, int len) {
+        mac.get().update(buffer, offset, len);
+        return 0;
+    }
+
     @Override
     boolean isInt32Digest() {
         return false;
     }
+
+    @Override
+    boolean acceptsMemoryAddressBuffer() {
+        return false;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
index 3d48f0ef7d..7635e3e9f2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
@@ -39,4 +39,9 @@ class StandardCRC32Digest implements CRC32Digest {
     public void update(ByteBuf buf, int offset, int len) {
         crc.update(buf.slice(offset, len).nioBuffer());
     }
+
+    @Override
+    public void update(byte[] buffer, int offset, int len) {
+        crc.update(buffer, offset, len);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index 324588d852..b363e4da63 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.util;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
@@ -133,43 +132,14 @@ public class ByteBufList extends AbstractReferenceCounted {
      * Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
      */
     public void add(ByteBuf buf) {
-        final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
-                ? buf.unwrap() : buf;
-        ReferenceCountUtil.retain(unwrapped);
-        ReferenceCountUtil.release(buf);
-
-        if (unwrapped instanceof CompositeByteBuf) {
-            ((CompositeByteBuf) unwrapped).forEach(b -> {
-                ReferenceCountUtil.retain(b);
-                buffers.add(b);
-            });
-            ReferenceCountUtil.release(unwrapped);
-        } else {
-            buffers.add(unwrapped);
-        }
+        buffers.add(buf);
     }
 
     /**
      * Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
      */
     public void prepend(ByteBuf buf) {
-        // don't unwrap slices
-        final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
-                ? buf.unwrap() : buf;
-        ReferenceCountUtil.retain(unwrapped);
-        ReferenceCountUtil.release(buf);
-
-        if (unwrapped instanceof CompositeByteBuf) {
-            CompositeByteBuf composite = (CompositeByteBuf) unwrapped;
-            for (int i = composite.numComponents() - 1; i >= 0; i--) {
-                ByteBuf b = composite.component(i);
-                ReferenceCountUtil.retain(b);
-                buffers.add(0, b);
-            }
-            ReferenceCountUtil.release(unwrapped);
-        } else {
-            buffers.add(0, unwrapped);
-        }
+        buffers.add(0, buf);
     }
 
     /**
@@ -285,7 +255,7 @@ public class ByteBufList extends AbstractReferenceCounted {
     @Override
     protected void deallocate() {
         for (int i = 0; i < buffers.size(); i++) {
-            ReferenceCountUtil.release(buffers.get(i));
+            buffers.get(i).release();
         }
 
         buffers.clear();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java
new file mode 100644
index 0000000000..32e9c8c55a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java
@@ -0,0 +1,1132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ByteProcessor;
+import io.netty.util.concurrent.FastThreadLocal;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.charset.Charset;
+
+/**
+ * This class visits the possible wrapped child buffers of a Netty {@link ByteBuf} for a given offset and length.
+ * <p>
+ * The Netty ByteBuf API does not provide a method to visit the wrapped child buffers. The
+ * {@link ByteBuf#unwrap()} method is not suitable for this purpose as it loses the
+ * {@link ByteBuf#readerIndex()} state, resulting in incorrect offset and length information.
+ * <p>
+ * Despite Netty not having a public API for visiting the sub buffers, it is possible to achieve this using
+ * the {@link ByteBuf#getBytes(int, ByteBuf, int, int)} method. This class uses this method to visit the
+ * wrapped child buffers by providing a suitable {@link ByteBuf} implementation. This implementation supports
+ * the role of the destination buffer for the getBytes call. It requires implementing the
+ * {@link ByteBuf#setBytes(int, ByteBuf, int, int)} and {@link ByteBuf#setBytes(int, byte[], int, int)} methods
+ * and other methods required by getBytes such as {@link ByteBuf#hasArray()}, {@link ByteBuf#hasMemoryAddress()},
+ * {@link ByteBuf#nioBufferCount()} and {@link ByteBuf#capacity()}.
+ * All other methods in the internal ByteBuf implementation are not supported and will throw an exception.
+ * This is to ensure correctness and to fail fast if some ByteBuf implementation is not following the expected
+ * and supported interface contract.
+ */
+public class ByteBufVisitor {
+    private static final int DEFAULT_VISIT_MAX_DEPTH = 10;
+
+    private ByteBufVisitor() {
+        // prevent instantiation
+    }
+
+    /**
+     * This method traverses the potential nested composite buffers of the provided buffer, given a specific offset and
+     * length. The traversal continues until it encounters a buffer that is backed by an array or a memory address,
+     * which allows for the inspection of individual buffer segments without the need for data duplication.
+     * If no such wrapped buffer is found, the callback function is invoked with the original buffer, offset,
+     * and length as parameters.
+     *
+     * @param buffer   the buffer to visit
+     * @param offset   the offset for the buffer
+     * @param length   the length for the buffer
+     * @param callback the callback to call for each visited buffer
+     * @param context  the context to pass to the callback
+     */
+    public static <T> void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufVisitorCallback<T> callback,
+                                        T context) {
+        visitBuffers(buffer, offset, length, callback, context, DEFAULT_VISIT_MAX_DEPTH);
+    }
+
+    /**
+     * The callback interface for visiting buffers.
+     * In case of a heap buffer that is backed by an byte[] array, the visitArray method is called. This
+     * is due to the internal implementation detail of the {@link ByteBuf#getBytes(int, ByteBuf, int, int)}
+     * method for heap buffers.
+     */
+    public interface ByteBufVisitorCallback<T> {
+        void visitBuffer(T context, ByteBuf visitBuffer, int visitIndex, int visitLength);
+        void visitArray(T context, byte[] visitArray, int visitIndex, int visitLength);
+        default boolean preferArrayOrMemoryAddress(T context) {
+            return true;
+        }
+        default boolean acceptsMemoryAddress(T context) {
+            return false;
+        }
+    }
+
+    /**
+     * See @{@link #visitBuffers(ByteBuf, int, int, ByteBufVisitorCallback, Object)}. This method
+     * allows to specify the maximum depth of recursion for visiting wrapped buffers.
+     */
+    public static <T> void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufVisitorCallback<T> callback,
+                                        T context, int maxDepth) {
+        if (length == 0) {
+            // skip visiting empty buffers
+            return;
+        }
+        InternalContext<T> internalContext = new InternalContext<>();
+        internalContext.maxDepth = maxDepth;
+        internalContext.callbackContext = context;
+        internalContext.callback = callback;
+        internalContext.recursivelyVisitBuffers(buffer, offset, length);
+    }
+
+    private static final int TL_COPY_BUFFER_SIZE = 64 * 1024;
+    private static final FastThreadLocal<byte[]> TL_COPY_BUFFER = new FastThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+            return new byte[TL_COPY_BUFFER_SIZE];
+        }
+    };
+
+    private static class InternalContext<T> {
+        int depth;
+        int maxDepth;
+        ByteBuf parentBuffer;
+        int parentOffset;
+        int parentLength;
+        T callbackContext;
+        ByteBufVisitorCallback<T> callback;
+        GetBytesCallbackByteBuf<T> callbackByteBuf = new GetBytesCallbackByteBuf(this);
+
+        void recursivelyVisitBuffers(ByteBuf visitBuffer, int visitIndex, int visitLength) {
+            // visit the wrapped buffers recursively if the buffer is not backed by an array or memory address
+            // and the max depth has not been reached
+            if (depth < maxDepth && !visitBuffer.hasMemoryAddress() && !visitBuffer.hasArray()) {
+                parentBuffer = visitBuffer;
+                parentOffset = visitIndex;
+                parentLength = visitLength;
+                depth++;
+                // call getBytes to trigger the wrapped buffer visit
+                visitBuffer.getBytes(visitIndex, callbackByteBuf, 0, visitLength);
+                depth--;
+            } else {
+                passBufferToCallback(visitBuffer, visitIndex, visitLength);
+            }
+        }
+
+        void handleBuffer(ByteBuf visitBuffer, int visitIndex, int visitLength) {
+            if (visitLength == 0) {
+                // skip visiting empty buffers
+                return;
+            }
+            if (visitBuffer == parentBuffer && visitIndex == parentOffset && visitLength == parentLength) {
+                // further recursion would cause unnecessary recursion up to the max depth of recursion
+                passBufferToCallback(visitBuffer, visitIndex, visitLength);
+            } else {
+                // use the doRecursivelyVisitBuffers method to visit the wrapped buffer, possibly recursively
+                recursivelyVisitBuffers(visitBuffer, visitIndex, visitLength);
+            }
+        }
+
+        private void passBufferToCallback(ByteBuf visitBuffer, int visitIndex, int visitLength) {
+            if (callback.preferArrayOrMemoryAddress(callbackContext)) {
+                if (visitBuffer.hasArray()) {
+                    handleArray(visitBuffer.array(), visitBuffer.arrayOffset() + visitIndex, visitLength);
+                } else if (visitBuffer.hasMemoryAddress() && callback.acceptsMemoryAddress(callbackContext)) {
+                    callback.visitBuffer(callbackContext, visitBuffer, visitIndex, visitLength);
+                } else if (callback.acceptsMemoryAddress(callbackContext) && visitBuffer.isDirect()
+                        && visitBuffer.alloc().isDirectBufferPooled()) {
+                    // read-only buffers need to be copied before they can be directly accessed
+                    ByteBuf copyBuffer = visitBuffer.copy(visitIndex, visitLength);
+                    callback.visitBuffer(callbackContext, copyBuffer, 0, visitLength);
+                    copyBuffer.release();
+                } else {
+                    // fallback to reading the visited buffer into the copy buffer in a loop
+                    byte[] copyBuffer = TL_COPY_BUFFER.get();
+                    int remaining = visitLength;
+                    int currentOffset = visitIndex;
+                    while (remaining > 0) {
+                        int readLen = Math.min(remaining, copyBuffer.length);
+                        visitBuffer.getBytes(currentOffset, copyBuffer, 0, readLen);
+                        handleArray(copyBuffer, 0, readLen);
+                        remaining -= readLen;
+                        currentOffset += readLen;
+                    }
+                }
+            } else {
+                callback.visitBuffer(callbackContext, visitBuffer, visitIndex, visitLength);
+            }
+        }
+
+        void handleArray(byte[] visitArray, int visitIndex, int visitLength) {
+            if (visitLength == 0) {
+                // skip visiting empty arrays
+                return;
+            }
+            // pass array to callback
+            callback.visitArray(callbackContext, visitArray, visitIndex, visitLength);
+        }
+    }
+
+    /**
+     * A ByteBuf implementation that can be used as the destination buffer for
+     * a {@link ByteBuf#getBytes(int, ByteBuf)} for visiting the wrapped child buffers.
+     */
+    static class GetBytesCallbackByteBuf<T> extends ByteBuf {
+        private final InternalContext<T> internalContext;
+
+        GetBytesCallbackByteBuf(InternalContext<T> internalContext) {
+            this.internalContext = internalContext;
+        }
+
+        @Override
+        public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+            internalContext.handleBuffer(src, srcIndex, length);
+            return this;
+        }
+
+        @Override
+        public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+            internalContext.handleArray(src, srcIndex, length);
+            return this;
+        }
+
+        @Override
+        public boolean hasArray() {
+            // return false so that the wrapped buffer is visited
+            return false;
+        }
+
+        @Override
+        public boolean hasMemoryAddress() {
+            // return false so that the wrapped buffer is visited
+            return false;
+        }
+
+        @Override
+        public int nioBufferCount() {
+            // return 0 so that the wrapped buffer is visited
+            return 0;
+        }
+
+        @Override
+        public int capacity() {
+            // should return sufficient capacity for the total length
+            return Integer.MAX_VALUE;
+        }
+
+        @Override
+        public ByteBuf capacity(int newCapacity) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int maxCapacity() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBufAllocator alloc() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteOrder order() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf order(ByteOrder endianness) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf unwrap() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isDirect() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isReadOnly() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf asReadOnly() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readerIndex() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readerIndex(int readerIndex) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int writerIndex() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writerIndex(int writerIndex) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setIndex(int readerIndex, int writerIndex) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readableBytes() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int writableBytes() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int maxWritableBytes() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isReadable() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isReadable(int size) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isWritable() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isWritable(int size) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf clear() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf markReaderIndex() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf resetReaderIndex() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf markWriterIndex() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf resetWriterIndex() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf discardReadBytes() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf discardSomeReadBytes() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf ensureWritable(int minWritableBytes) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int ensureWritable(int minWritableBytes, boolean force) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean getBoolean(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public byte getByte(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public short getUnsignedByte(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public short getShort(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public short getShortLE(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getUnsignedShort(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getUnsignedShortLE(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getMedium(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getMediumLE(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getUnsignedMedium(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getUnsignedMediumLE(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getInt(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getIntLE(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getUnsignedInt(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getUnsignedIntLE(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getLong(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getLongLE(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public char getChar(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public float getFloat(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public double getDouble(int index) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf getBytes(int index, ByteBuf dst) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf getBytes(int index, ByteBuf dst, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf getBytes(int index, byte[] dst) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf getBytes(int index, ByteBuffer dst) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CharSequence getCharSequence(int index, int length, Charset charset) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setBoolean(int index, boolean value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setByte(int index, int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setShort(int index, int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setShortLE(int index, int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setMedium(int index, int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setMediumLE(int index, int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setInt(int index, int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setIntLE(int index, int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setLong(int index, long value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setLongLE(int index, long value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setChar(int index, int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setFloat(int index, float value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setDouble(int index, double value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setBytes(int index, ByteBuf src) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setBytes(int index, ByteBuf src, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setBytes(int index, byte[] src) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setBytes(int index, ByteBuffer src) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int setBytes(int index, InputStream in, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf setZero(int index, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int setCharSequence(int index, CharSequence sequence, Charset charset) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean readBoolean() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public byte readByte() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public short readUnsignedByte() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public short readShort() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public short readShortLE() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readUnsignedShort() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readUnsignedShortLE() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readMedium() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readMediumLE() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readUnsignedMedium() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readUnsignedMediumLE() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readInt() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readIntLE() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long readUnsignedInt() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long readUnsignedIntLE() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long readLong() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long readLongLE() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public char readChar() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public float readFloat() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public double readDouble() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readBytes(int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readSlice(int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readRetainedSlice(int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readBytes(ByteBuf dst) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readBytes(ByteBuf dst, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readBytes(byte[] dst) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readBytes(ByteBuffer dst) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf readBytes(OutputStream out, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readBytes(GatheringByteChannel out, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CharSequence readCharSequence(int length, Charset charset) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int readBytes(FileChannel out, long position, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf skipBytes(int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeBoolean(boolean value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeByte(int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeShort(int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeShortLE(int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeMedium(int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeMediumLE(int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeInt(int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeIntLE(int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeLong(long value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeLongLE(long value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeChar(int value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeFloat(float value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeDouble(double value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeBytes(ByteBuf src) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeBytes(ByteBuf src, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeBytes(byte[] src) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeBytes(ByteBuffer src) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int writeBytes(InputStream in, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int writeBytes(FileChannel in, long position, int length) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf writeZero(int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int writeCharSequence(CharSequence sequence, Charset charset) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int indexOf(int fromIndex, int toIndex, byte value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int bytesBefore(byte value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int bytesBefore(int length, byte value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int bytesBefore(int index, int length, byte value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int forEachByte(ByteProcessor processor) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int forEachByte(int index, int length, ByteProcessor processor) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int forEachByteDesc(ByteProcessor processor) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int forEachByteDesc(int index, int length, ByteProcessor processor) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf copy() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf copy(int index, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf slice() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf retainedSlice() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf slice(int index, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf retainedSlice(int index, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf duplicate() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf retainedDuplicate() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuffer nioBuffer() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuffer nioBuffer(int index, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuffer internalNioBuffer(int index, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuffer[] nioBuffers() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuffer[] nioBuffers(int index, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+
+        @Override
+        public byte[] array() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int arrayOffset() {
+            throw new UnsupportedOperationException();
+        }
+        @Override
+        public long memoryAddress() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String toString(Charset charset) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String toString(int index, int length, Charset charset) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int compareTo(ByteBuf buffer) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf retain(int increment) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int refCnt() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf retain() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf touch() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuf touch(Object hint) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean release() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean release(int decrement) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String toString() {
+            return getClass().getSimpleName() + '@' + Integer.toHexString(System.identityHashCode(this));
+        }
+
+        @Override
+        public int hashCode() {
+            return System.identityHashCode(this);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return obj == this;
+        }
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java
new file mode 100644
index 0000000000..6252bb71be
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.proto.checksum;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+import com.scurrilous.circe.checksum.IntHash;
+import com.scurrilous.circe.checksum.Java8IntHash;
+import com.scurrilous.circe.checksum.Java9IntHash;
+import com.scurrilous.circe.checksum.JniIntHash;
+import com.scurrilous.circe.crc.Sse42Crc32C;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.bookkeeper.proto.BookieProtoEncoding;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.ByteBufVisitor;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This test class was added to reproduce a bug in the checksum calculation when
+ * the payload is a CompositeByteBuf and this buffer has a reader index state other than 0.
+ * The reader index state gets lost in the unwrapping process.
+ *
+ * There were at least 2 different bugs. One that occured when the
+ * payload was >= BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD and the other when
+ * it was < BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD.
+ * This test covers both useV2Protocol=true and useV2Protocol=false since the bug was triggered differently.
+ *
+ * The bug has been fixed and this test is here to make sure it doesn't happen again.
+ */
+@RunWith(Parameterized.class)
+public class CompositeByteBufUnwrapBugReproduceTest {
+    final byte[] testPayLoad;
+    final int defaultBufferPrefixLength;
+    private final boolean useV2Protocol;
+
+    // set to 0 to 3 to run a single scenario for debugging purposes
+    private static final int RUN_SINGLE_SCENARIO_FOR_DEBUGGING = -1;
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> testScenarios() {
+        List<Object[]> scenarios = Arrays.asList(new Object[][] {
+                {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD - 1, true},
+                {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD - 1, false},
+                {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD, true},
+                {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD, false}
+        });
+        if (RUN_SINGLE_SCENARIO_FOR_DEBUGGING >= 0) {
+            // pick a single scenario for debugging
+            scenarios = scenarios.subList(RUN_SINGLE_SCENARIO_FOR_DEBUGGING, 1);
+        }
+        return scenarios;
+    }
+
+    public CompositeByteBufUnwrapBugReproduceTest(int payloadSize, boolean useV2Protocol) {
+        this.testPayLoad = createTestPayLoad(payloadSize);
+        this.defaultBufferPrefixLength = payloadSize / 7;
+        this.useV2Protocol = useV2Protocol;
+    }
+
+    private static byte[] createTestPayLoad(int payloadSize) {
+        byte[] payload = new byte[payloadSize];
+        for (int i = 0; i < payloadSize; i++) {
+            payload[i] = (byte) i;
+        }
+        return payload;
+    }
+
+
+    /**
+     * A DigestManager that uses the given IntHash implementation for testing.
+     */
+    static class TestIntHashDigestManager extends DigestManager {
+        private final IntHash intHash;
+
+        public TestIntHashDigestManager(IntHash intHash, long ledgerId, boolean useV2Protocol,
+                                        ByteBufAllocator allocator) {
+            super(ledgerId, useV2Protocol, allocator);
+            this.intHash = intHash;
+        }
+
+        @Override
+        int getMacCodeLength() {
+            return 4;
+        }
+
+        @Override
+        boolean isInt32Digest() {
+            return true;
+        }
+
+        @Override
+        void populateValueAndReset(int digest, ByteBuf buf) {
+            buf.writeInt(digest);
+        }
+
+        @Override
+        int internalUpdate(int digest, ByteBuf data, int offset, int len) {
+            return intHash.resume(digest, data, offset, len);
+        }
+
+        @Override
+        int internalUpdate(int digest, byte[] buffer, int offset, int len) {
+            return intHash.resume(digest, buffer, offset, len);
+        }
+
+        @Override
+        boolean acceptsMemoryAddressBuffer() {
+            return intHash.acceptsMemoryAddressBuffer();
+        }
+    }
+
+    @Test
+    public void shouldCalculateChecksumForCompositeBuffer() {
+        ByteBuf testPayload = Unpooled.wrappedBuffer(testPayLoad);
+        byte[] referenceOutput = computeDigestAndPackageForSending(new Java8IntHash(), testPayload.retainedDuplicate());
+        assertDigestAndPackageMatchesReference(new Java8IntHash(), testPayload, referenceOutput);
+        assertDigestAndPackageMatchesReference(new Java9IntHash(), testPayload, referenceOutput);
+        if (Sse42Crc32C.isSupported()) {
+            assertDigestAndPackageMatchesReference(new JniIntHash(), testPayload, referenceOutput);
+        }
+        testPayload.release();
+    }
+
+    private void assertDigestAndPackageMatchesReference(IntHash intHash, ByteBuf payload, byte[] referenceOutput) {
+        assertDigestAndPackageScenario(intHash, payload.retainedDuplicate(), referenceOutput, testPayLoad,
+                "plain payload, no wrapping");
+
+        ByteBuf payload2 = wrapWithPrefixAndCompositeByteBufWithReaderIndexState(payload.retainedDuplicate(),
+                defaultBufferPrefixLength);
+        assertDigestAndPackageScenario(intHash, payload2, referenceOutput, testPayLoad,
+                "payload with prefix wrapped in CompositeByteBuf with readerIndex state");
+
+        ByteBuf payload3 = wrapWithPrefixAndMultipleCompositeByteBufWithReaderIndexStateAndMultipleLayersOfDuplicate(
+                payload.retainedDuplicate(), defaultBufferPrefixLength);
+        assertDigestAndPackageScenario(intHash, payload3, referenceOutput, testPayLoad,
+                "payload with prefix wrapped in 2 layers of CompositeByteBuf with readerIndex state in the outer "
+                        + "composite. In addition, the outer composite is duplicated twice.");
+
+        ByteBuf payload4 = wrapInCompositeByteBufAndSlice(payload.retainedDuplicate(), defaultBufferPrefixLength);
+        assertDigestAndPackageScenario(intHash, payload4, referenceOutput, testPayLoad,
+                "payload with prefix wrapped in CompositeByteBuf and sliced");
+    }
+
+    private void assertDigestAndPackageScenario(IntHash intHash, ByteBuf payload, byte[] referenceOutput,
+                                                byte[] testPayLoadArray,
+                                                String scenario) {
+        // this validates that the readable bytes in the payload match the TEST_PAYLOAD content
+        assertArrayEquals(testPayLoadArray, ByteBufUtil.getBytes(payload.duplicate()),
+                "input is invalid for scenario '" + scenario + "'");
+
+        ByteBuf visitedCopy = Unpooled.buffer(payload.readableBytes());
+        ByteBufVisitor.visitBuffers(payload, payload.readerIndex(), payload.readableBytes(),
+                new ByteBufVisitor.ByteBufVisitorCallback<Void>() {
+                    @Override
+                    public void visitBuffer(Void context, ByteBuf visitBuffer, int visitIndex, int visitLength) {
+                        visitedCopy.writeBytes(visitBuffer, visitIndex, visitLength);
+                    }
+
+                    @Override
+                    public void visitArray(Void context, byte[] visitArray, int visitIndex, int visitLength) {
+                        visitedCopy.writeBytes(visitArray, visitIndex, visitLength);
+                    }
+                }, null);
+
+        assertArrayEquals(ByteBufUtil.getBytes(visitedCopy), testPayLoadArray,
+                "visited copy is invalid for scenario '" + scenario + "'. Bug in ByteBufVisitor?");
+
+        // compute the digest and package
+        byte[] output = computeDigestAndPackageForSending(intHash, payload.duplicate());
+        if (referenceOutput == null) {
+            referenceOutput =
+                    computeDigestAndPackageForSending(new Java8IntHash(), Unpooled.wrappedBuffer(testPayLoadArray));
+        }
+        // this validates that the output matches the reference output
+        assertArrayEquals(referenceOutput, output, "output is invalid for scenario '" + scenario + "'");
+    }
+
+    private byte[] computeDigestAndPackageForSending(IntHash intHash, ByteBuf data) {
+        DigestManager digestManager = new TestIntHashDigestManager(intHash, 1, useV2Protocol, ByteBufAllocator.DEFAULT);
+        ReferenceCounted packagedBuffer =
+                digestManager.computeDigestAndPackageForSending(1, 0, data.readableBytes(), data,
+                        MacDigestManager.EMPTY_LEDGER_KEY, BookieProtocol.FLAG_NONE);
+        return packagedBufferToBytes(packagedBuffer);
+    }
+
+    ByteBuf wrapWithPrefixAndCompositeByteBufWithReaderIndexState(ByteBuf payload, int bufferPrefixLength) {
+        // create a new buffer with a prefix and the actual payload
+        ByteBuf prefixedPayload = ByteBufAllocator.DEFAULT.buffer(bufferPrefixLength + payload.readableBytes());
+        prefixedPayload.writeBytes(RandomUtils.nextBytes(bufferPrefixLength));
+        prefixedPayload.writeBytes(payload);
+
+        // wrap the buffer in a composite buffer
+        CompositeByteBuf outerComposite = ByteBufAllocator.DEFAULT.compositeBuffer();
+        outerComposite.addComponent(true, prefixedPayload);
+
+        // set reader index state. this is the state that gets lost in the unwrapping process
+        outerComposite.readerIndex(bufferPrefixLength);
+
+        return outerComposite;
+    }
+
+    ByteBuf wrapWithPrefixAndMultipleCompositeByteBufWithReaderIndexStateAndMultipleLayersOfDuplicate(ByteBuf payload,
+                                                                                               int bufferPrefixLength) {
+        // create a new buffer with a prefix and the actual payload
+        ByteBuf prefixedPayload = ByteBufAllocator.DEFAULT.buffer(bufferPrefixLength + payload.readableBytes());
+        prefixedPayload.writeBytes(RandomUtils.nextBytes(bufferPrefixLength));
+        prefixedPayload.writeBytes(payload);
+
+        CompositeByteBuf innerComposite = ByteBufAllocator.DEFAULT.compositeBuffer();
+        innerComposite.addComponent(true, prefixedPayload);
+        innerComposite.addComponent(true, Unpooled.EMPTY_BUFFER);
+
+        // wrap the buffer in a composite buffer
+        CompositeByteBuf outerComposite = ByteBufAllocator.DEFAULT.compositeBuffer();
+        outerComposite.addComponent(true, innerComposite);
+        outerComposite.addComponent(true, Unpooled.EMPTY_BUFFER);
+
+        // set reader index state. this is the state that gets lost in the unwrapping process
+        outerComposite.readerIndex(bufferPrefixLength);
+
+        return outerComposite.duplicate().duplicate();
+    }
+
+    ByteBuf wrapInCompositeByteBufAndSlice(ByteBuf payload, int bufferPrefixLength) {
+        // create a composite buffer
+        CompositeByteBuf compositeWithPrefix = ByteBufAllocator.DEFAULT.compositeBuffer();
+        compositeWithPrefix.addComponent(true, Unpooled.wrappedBuffer(RandomUtils.nextBytes(bufferPrefixLength)));
+        compositeWithPrefix.addComponent(true, payload);
+
+        // return a slice of the composite buffer so that it returns the payload
+        return compositeWithPrefix.slice(bufferPrefixLength, payload.readableBytes());
+    }
+
+    private static byte[] packagedBufferToBytes(ReferenceCounted packagedBuffer) {
+        byte[] output;
+        if (packagedBuffer instanceof ByteBufList) {
+            ByteBufList bufList = (ByteBufList) packagedBuffer;
+            output = new byte[bufList.readableBytes()];
+            bufList.getBytes(output);
+            for (int i = 0; i < bufList.size(); i++) {
+                bufList.getBuffer(i).release();
+            }
+        } else if (packagedBuffer instanceof ByteBuf) {
+            output = ByteBufUtil.getBytes((ByteBuf) packagedBuffer);
+            packagedBuffer.release();
+        } else {
+            throw new RuntimeException("Unexpected type: " + packagedBuffer.getClass());
+        }
+        return output;
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
index ac7aca7722..88de17d0a9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
@@ -87,39 +86,6 @@ public class ByteBufListTest {
         assertEquals(b2.refCnt(), 0);
     }
 
-    @Test
-    public void testComposite() throws Exception {
-        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
-        b1.writerIndex(b1.capacity());
-        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
-        b2.writerIndex(b2.capacity());
-
-        CompositeByteBuf composite = PooledByteBufAllocator.DEFAULT.compositeBuffer();
-        composite.addComponent(b1);
-        composite.addComponent(b2);
-
-        ByteBufList buf = ByteBufList.get(composite);
-
-        // composite is unwrapped into two parts
-        assertEquals(2, buf.size());
-        // and released
-        assertEquals(composite.refCnt(), 0);
-
-        assertEquals(256, buf.readableBytes());
-        assertEquals(b1, buf.getBuffer(0));
-        assertEquals(b2, buf.getBuffer(1));
-
-        assertEquals(buf.refCnt(), 1);
-        assertEquals(b1.refCnt(), 1);
-        assertEquals(b2.refCnt(), 1);
-
-        buf.release();
-
-        assertEquals(buf.refCnt(), 0);
-        assertEquals(b1.refCnt(), 0);
-        assertEquals(b2.refCnt(), 0);
-    }
-
     @Test
     public void testClone() throws Exception {
         ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java
index 65a77b1492..d90f8b7ea5 100644
--- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java
+++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java
@@ -71,12 +71,30 @@ public class Crc32cIntChecksum {
     /**
      * Computes incremental checksum with input previousChecksum and input payload
      *
-     * @param previousChecksum : previously computed checksum
-     * @param payload
-     * @return
+     * @param previousChecksum the previously computed checksum
+     * @param payload the data for which the checksum is to be computed
+     * @param offset the starting position in the payload
+     * @param len the number of bytes to include in the checksum computation
+     * @return the updated checksum
      */
     public static int resumeChecksum(int previousChecksum, ByteBuf payload, int offset, int len) {
         return CRC32C_HASH.resume(previousChecksum, payload, offset, len);
     }
 
+    /**
+     * Computes incremental checksum with input previousChecksum and input payload
+     *
+     * @param previousChecksum the previously computed checksum
+     * @param payload the data for which the checksum is to be computed
+     * @param offset the starting position in the payload
+     * @param len the number of bytes to include in the checksum computation
+     * @return the updated checksum
+     */
+    public static int resumeChecksum(int previousChecksum, byte[] payload, int offset, int len) {
+        return CRC32C_HASH.resume(previousChecksum, payload, offset, len);
+    }
+
+    public static boolean acceptsMemoryAddressBuffer() {
+        return CRC32C_HASH.acceptsMemoryAddressBuffer();
+    }
 }
diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java
index e8922e3a16..be98ae19be 100644
--- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java
+++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java
@@ -28,4 +28,8 @@ public interface IntHash {
     int resume(int current, ByteBuf buffer);
 
     int resume(int current, ByteBuf buffer, int offset, int len);
+
+    int resume(int current, byte[] buffer, int offset, int len);
+
+    boolean acceptsMemoryAddressBuffer();
 }
diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java
index fd548bc4de..2825c610b1 100644
--- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java
+++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java
@@ -53,4 +53,14 @@ public class Java8IntHash implements IntHash {
             return hash.resume(current, buffer.slice(offset, len).nioBuffer());
         }
     }
+
+    @Override
+    public int resume(int current, byte[] buffer, int offset, int len) {
+        return hash.resume(current, buffer, offset, len);
+    }
+
+    @Override
+    public boolean acceptsMemoryAddressBuffer() {
+        return false;
+    }
 }
diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java
index 31af153666..2e779a9276 100644
--- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java
+++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java
@@ -19,7 +19,6 @@
 package com.scurrilous.circe.checksum;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -77,7 +76,7 @@ public class Java9IntHash implements IntHash {
         return resume(0, buffer, offset, len);
     }
 
-    private int resume(int current, long address, int offset, int length) {
+    private int updateDirectByteBuffer(int current, long address, int offset, int length) {
         try {
             return (int) UPDATE_DIRECT_BYTEBUFFER.invoke(null, current, address, offset, offset + length);
         } catch (IllegalAccessException | InvocationTargetException e) {
@@ -85,7 +84,20 @@ public class Java9IntHash implements IntHash {
         }
     }
 
-    private int resume(int current, byte[] array, int offset, int length) {
+    @Override
+    public int resume(int current, byte[] array, int offset, int length) {
+        // the bit-wise complementing of the input and output is explained in the resume method below
+        current = ~current;
+        current = updateBytes(current, array, offset, length);
+        return ~current;
+    }
+
+    @Override
+    public boolean acceptsMemoryAddressBuffer() {
+        return true;
+    }
+
+    private static int updateBytes(int current, byte[] array, int offset, int length) {
         try {
             return (int) UPDATE_BYTES.invoke(null, current, array, offset, offset + length);
         } catch (IllegalAccessException | InvocationTargetException e) {
@@ -100,33 +112,37 @@ public class Java9IntHash implements IntHash {
 
     @Override
     public int resume(int current, ByteBuf buffer, int offset, int len) {
-        int negCrc = ~current;
+        // The input value is bit-wise complemented for two reasons:
+        // 1. The CRC32C algorithm is designed to start with a seed value where all bits are set to 1 (0xffffffff).
+        //    When 0 is initially passed in, ~0 results in the correct initial value (0xffffffff).
+        // 2. The CRC32C algorithm complements the final value as the last step. This method will always complement
+        //    the return value. Therefore, when the algorithm is used iteratively, it is necessary to complement
+        //    the input value to continue calculations.
+        // This allows the algorithm to be used incrementally without needing separate initialization and
+        // finalization steps.
+        current = ~current;
 
         if (buffer.hasMemoryAddress()) {
-            negCrc = resume(negCrc, buffer.memoryAddress(), offset, len);
+            current = updateDirectByteBuffer(current, buffer.memoryAddress(), offset, len);
         } else if (buffer.hasArray()) {
             int arrayOffset = buffer.arrayOffset() + offset;
-            negCrc = resume(negCrc, buffer.array(), arrayOffset, len);
-        } else if (buffer instanceof CompositeByteBuf) {
-           CompositeByteBuf compositeByteBuf = (CompositeByteBuf) buffer;
-           int loopedCurrent = current;
-           for (int i = 0; i < compositeByteBuf.numComponents(); i ++) {
-               loopedCurrent = resume(loopedCurrent, compositeByteBuf.component(i));
-           }
-           return loopedCurrent;
+            current = updateBytes(current, buffer.array(), arrayOffset, len);
         } else {
             byte[] b = TL_BUFFER.get();
             int toRead = len;
             int loopOffset = offset;
             while (toRead > 0) {
                 int length = Math.min(toRead, b.length);
-                buffer.slice(loopOffset, length).readBytes(b, 0, length);
-                negCrc = resume(negCrc, b, 0, length);
+                buffer.getBytes(loopOffset, b, 0, length);
+                current = updateBytes(current, b, 0, length);
                 toRead -= length;
                 loopOffset += length;
             }
         }
 
-        return ~negCrc;
+        // The current value is complemented to align with the finalization step of the CRC32C algorithm.
+        // If there is a subsequent resume step, the value will be complemented again to initiate the next step
+        // as described in the comments in the beginning of this method.
+        return ~current;
     }
 }
diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java
index e8e87bf6b1..dc5bed0fc1 100644
--- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java
+++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java
@@ -51,4 +51,14 @@ public class JniIntHash implements IntHash {
             return hash.resume(current, buffer.slice(offset, len).nioBuffer());
         }
     }
+
+    @Override
+    public int resume(int current, byte[] buffer, int offset, int len) {
+        return hash.resume(current, buffer, offset, len);
+    }
+
+    @Override
+    public boolean acceptsMemoryAddressBuffer() {
+        return true;
+    }
 }