You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/02/18 12:08:34 UTC

[GitHub] sijie closed pull request #1141: Replace DoubleByteBuf with ByteBufList

sijie closed pull request #1141: Replace DoubleByteBuf with ByteBufList
URL: https://github.com/apache/bookkeeper/pull/1141
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 08f548c76..db8ccbff1 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -37,6 +37,7 @@
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -186,7 +187,7 @@ public static void main(String[] args)
             toSend.writeLong(entry);
             toSend.writerIndex(toSend.capacity());
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
-                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+                    entry, ByteBufList.get(toSend), tc, null, BookieProtocol.FLAG_NONE);
         }
         LOG.info("Waiting for warmup");
         tc.waitFor(warmUpCount);
@@ -203,7 +204,7 @@ public static void main(String[] args)
             toSend.writerIndex(toSend.capacity());
             lc.resetComplete();
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
-                        entry, toSend, lc, null, BookieProtocol.FLAG_NONE);
+                    entry, ByteBufList.get(toSend), lc, null, BookieProtocol.FLAG_NONE);
             lc.waitForComplete();
         }
         long endTime = System.nanoTime();
@@ -221,7 +222,7 @@ public static void main(String[] args)
             toSend.writeLong(entry);
             toSend.writerIndex(toSend.capacity());
             bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
-                        entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
+                    entry, ByteBufList.get(toSend), tc, null, BookieProtocol.FLAG_NONE);
         }
         tc.waitFor(throughputCount);
         endTime = System.currentTimeMillis();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index 5acaeae87..c77843a6c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -20,13 +20,12 @@
  */
 package org.apache.bookkeeper.client;
 
-import io.netty.buffer.ByteBuf;
-
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.SyncCallbackUtils.LastAddConfirmedCallback;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,7 +138,8 @@ void asyncExplicitLacFlush(final long explicitLac) {
                 lh.bk.getMainWorkerPool().submit(new SafeRunnable() {
                     @Override
                     public void safeRun() {
-                        ByteBuf toSend = lh.macManager.computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
+                        ByteBufList toSend = lh.macManager
+                                .computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
                         op.initiate(toSend);
                     }
                 });
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 31d8e21ef..5c2ff8c6d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -21,8 +21,8 @@
 
 import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashSet;
@@ -33,6 +33,7 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -44,6 +45,7 @@
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.apache.zookeeper.AsyncCallback;
 import org.slf4j.Logger;
@@ -313,13 +315,13 @@ public void readComplete(int rc, LedgerHandle lh,
                 final long dataLength = data.length;
                 numEntriesRead.inc();
                 numBytesRead.registerSuccessfulValue(dataLength);
-                ByteBuf toSend = lh.getDigestManager()
+                ByteBufList toSend = lh.getDigestManager()
                         .computeDigestAndPackageForSending(entryId,
                                 lh.getLastAddConfirmed(), entry.getLength(),
                                 Unpooled.wrappedBuffer(data, 0, data.length));
                 for (BookieSocketAddress newBookie : newBookies) {
                     bkc.getBookieClient().addEntry(newBookie, lh.getId(),
-                            lh.getLedgerKey(), entryId, toSend.retainedSlice(),
+                            lh.getLedgerKey(), entryId, ByteBufList.clone(toSend),
                             multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD);
                 }
                 toSend.release();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 8d65c0009..e40f29caf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -35,6 +35,7 @@
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
@@ -53,7 +54,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class);
 
     ByteBuf payload;
-    ByteBuf toSend;
+    ByteBufList toSend;
     AddCallbackWithLatency cb;
     Object ctx;
     long entryId;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index 950959b2a..af45e296b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -17,14 +17,13 @@
  */
 package org.apache.bookkeeper.client;
 
-import io.netty.buffer.ByteBuf;
-
 import java.util.BitSet;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +38,7 @@
  */
 class PendingWriteLacOp implements WriteLacCallback {
     private static final Logger LOG = LoggerFactory.getLogger(PendingWriteLacOp.class);
-    ByteBuf toSend;
+    ByteBufList toSend;
     AddLacCallback cb;
     long lac;
     Object ctx;
@@ -75,7 +74,7 @@ void sendWriteLacRequest(int bookieIndex) {
                 lac, toSend, this, bookieIndex);
     }
 
-    void initiate(ByteBuf toSend) {
+    void initiate(ByteBufList toSend) {
         this.toSend = toSend;
         DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(lac);
         try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 08ed19437..1623324d8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -60,6 +60,7 @@
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
@@ -187,7 +188,7 @@ public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
     }
 
     public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
-            final long lac, final ByteBuf toSend, final WriteLacCallback cb, final Object ctx) {
+            final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
         closeLock.readLock().lock();
         try {
             final PerChannelBookieClientPool client = lookupClient(addr);
@@ -250,7 +251,7 @@ public void addEntry(final BookieSocketAddress addr,
                          final long ledgerId,
                          final byte[] masterKey,
                          final long entryId,
-                         final ByteBuf toSend,
+                         final ByteBufList toSend,
                          final WriteCallback cb,
                          final Object ctx,
                          final int options) {
@@ -300,7 +301,7 @@ public void safeRun() {
         private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
 
         private BookieClient bookieClient;
-        private ByteBuf toSend;
+        private ByteBufList toSend;
         private long ledgerId;
         private long entryId;
         private BookieSocketAddress addr;
@@ -310,7 +311,7 @@ public void safeRun() {
         private byte[] masterKey;
 
         static ChannelReadyForAddEntryCallback create(
-                BookieClient bookieClient, ByteBuf toSend, long ledgerId,
+                BookieClient bookieClient, ByteBufList toSend, long ledgerId,
                 long entryId, BookieSocketAddress addr, Object ctx,
                 WriteCallback cb, int options, byte[] masterKey) {
             ChannelReadyForAddEntryCallback callback = RECYCLER.get();
@@ -619,7 +620,7 @@ public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress a
 
         for (int i = 0; i < 100000; i++) {
             counter.inc();
-            bc.addEntry(addr, ledger, new byte[0], i, Unpooled.wrappedBuffer(hello), cb, counter, 0);
+            bc.addEntry(addr, ledger, new byte[0], i, ByteBufList.get(Unpooled.wrappedBuffer(hello)), cb, counter, 0);
         }
         counter.wait(0);
         System.out.println("Total = " + counter.total());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 3c5385c5c..83a37776c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -73,6 +73,7 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.processor.RequestProcessor;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.commons.lang.SystemUtils;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -302,6 +303,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
                         new BookieSideConnectionPeerContextHandler();
                     ChannelPipeline pipeline = ch.pipeline();
 
+                    // For ByteBufList, skip the usual LengthFieldPrepender and have the encoder itself to add it
+                    pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE);
+
                     pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
                     pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 33bb675bd..24ef1172a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -41,7 +41,7 @@
 
 import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
 import org.apache.bookkeeper.proto.checksum.MacDigestManager;
-import org.apache.bookkeeper.util.DoubleByteBuf;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,9 +111,10 @@ public Object encode(Object msg, ByteBufAllocator allocator)
                 ByteBuf buf = allocator.buffer(totalHeaderSize);
                 buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags()));
                 buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
-                ByteBuf data = ar.getData();
+                ByteBufList data = ar.getData();
                 ar.recycle();
-                return DoubleByteBuf.get(buf, data);
+                data.prepend(buf);
+                return data;
             } else if (r instanceof BookieProtocol.ReadRequest) {
                 int totalHeaderSize = 4 // for request type
                     + 8 // for ledgerId
@@ -166,7 +167,7 @@ public Object decode(ByteBuf packet)
                 // Read ledger and entry id without advancing the reader index
                 ledgerId = packet.getLong(packet.readerIndex());
                 entryId = packet.getLong(packet.readerIndex() + 8);
-                return BookieProtocol.AddRequest.create(
+                return BookieProtocol.ParsedAddRequest.create(
                         version, ledgerId, entryId, flags,
                         masterKey, packet.retain());
             }
@@ -246,7 +247,7 @@ public Object encode(Object msg, ByteBufAllocator allocator)
 
                     BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse) r;
                     if (rr.hasData()) {
-                        return DoubleByteBuf.get(buf, rr.getData());
+                        return ByteBufList.get(buf, rr.getData());
                     } else {
                         return buf;
                     }
@@ -258,7 +259,7 @@ public Object encode(Object msg, ByteBufAllocator allocator)
                     return buf;
                 } else if (msg instanceof BookieProtocol.AuthResponse) {
                     BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) r).getAuthMessage();
-                    return DoubleByteBuf.get(buf, Unpooled.wrappedBuffer(am.toByteArray()));
+                    return ByteBufList.get(buf, Unpooled.wrappedBuffer(am.toByteArray()));
                 } else {
                     LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
                     return msg;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 4ad23d9f9..3136c57ec 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -26,6 +26,7 @@
 import io.netty.util.Recycler.Handle;
 
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ByteBufList;
 
 /**
  * The packets of the Bookie protocol all have a 4-byte integer indicating the
@@ -244,11 +245,11 @@ public void recycle() {}
      * A Request that adds data.
      */
     class AddRequest extends Request {
-        ByteBuf data;
+        ByteBufList data;
 
         static AddRequest create(byte protocolVersion, long ledgerId,
                                  long entryId, short flags, byte[] masterKey,
-                                 ByteBuf data) {
+                                 ByteBufList data) {
             AddRequest add = RECYCLER.get();
             add.protocolVersion = protocolVersion;
             add.opCode = ADDENTRY;
@@ -260,8 +261,9 @@ static AddRequest create(byte protocolVersion, long ledgerId,
             return add;
         }
 
-        ByteBuf getData() {
-            return data;
+        ByteBufList getData() {
+            // We need to have different ByteBufList instances for each bookie write
+            return ByteBufList.clone(data);
         }
 
         boolean isRecoveryAdd() {
@@ -293,6 +295,59 @@ public void recycle() {
         }
     }
 
+    /**
+     * This is similar to add request, but it used when processing the request on the bookie side.
+     */
+    class ParsedAddRequest extends Request {
+        ByteBuf data;
+
+        static ParsedAddRequest create(byte protocolVersion, long ledgerId, long entryId, short flags, byte[] masterKey,
+                ByteBuf data) {
+            ParsedAddRequest add = RECYCLER.get();
+            add.protocolVersion = protocolVersion;
+            add.opCode = ADDENTRY;
+            add.ledgerId = ledgerId;
+            add.entryId = entryId;
+            add.flags = flags;
+            add.masterKey = masterKey;
+            add.data = data.retain();
+            return add;
+        }
+
+        ByteBuf getData() {
+            // We need to have different ByteBufList instances for each bookie write
+            return data;
+        }
+
+        boolean isRecoveryAdd() {
+            return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
+        }
+
+        void release() {
+            data.release();
+        }
+
+        private final Handle<ParsedAddRequest> recyclerHandle;
+        private ParsedAddRequest(Handle<ParsedAddRequest> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<ParsedAddRequest> RECYCLER = new Recycler<ParsedAddRequest>() {
+            protected ParsedAddRequest newObject(Handle<ParsedAddRequest> handle) {
+                return new ParsedAddRequest(handle);
+            }
+        };
+
+        @Override
+        public void recycle() {
+            ledgerId = -1;
+            entryId = -1;
+            masterKey = null;
+            data = null;
+            recyclerHandle.recycle(this);
+        }
+    }
+
     /**
      * A Request that reads data.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 75b319308..666b644ea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -116,6 +116,7 @@
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -423,6 +424,7 @@ protected ChannelFuture connect() {
             protected void initChannel(Channel ch) throws Exception {
                 ChannelPipeline pipeline = ch.pipeline();
 
+                pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE);
                 pipeline.addLast("lengthbasedframedecoder",
                         new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
                 pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
@@ -494,7 +496,7 @@ void connectIfNeededAndDoOp(GenericCallback<PerChannelBookieClient> op) {
 
     }
 
-    void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteBuf toSend, WriteLacCallback cb,
+    void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteBufList toSend, WriteLacCallback cb,
             Object ctx) {
         final long txnId = getTxnId();
         final CompletionKey completionKey = new V3CompletionKey(txnId,
@@ -513,7 +515,7 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB
                 .setLedgerId(ledgerId)
                 .setLac(lac)
                 .setMasterKey(ByteString.copyFrom(masterKey))
-                .setBody(ByteString.copyFrom(toSend.nioBuffer()));
+                .setBody(ByteString.copyFrom(toSend.toArray()));
 
         final Request writeLacRequest = Request.newBuilder()
                 .setHeader(headerBuilder)
@@ -541,7 +543,7 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB
      * @param options
      *          Add options
      */
-    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf toSend, WriteCallback cb,
+    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
                   Object ctx, final int options) {
         Object request = null;
         CompletionKey completionKey = null;
@@ -560,8 +562,7 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
                     .setOperation(OperationType.ADD_ENTRY)
                     .setTxnId(txnId);
 
-            byte[] toSendArray = new byte[toSend.readableBytes()];
-            toSend.getBytes(toSend.readerIndex(), toSendArray);
+            byte[] toSendArray = toSend.toArray();
             AddRequest.Builder addBuilder = AddRequest.newBuilder()
                     .setLedgerId(ledgerId)
                     .setEntryId(entryId)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 34e2c2c3f..164a11b42 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -56,8 +56,8 @@ public static WriteEntryProcessor create(Request request, Channel channel,
 
     @Override
     protected void processPacket() {
-        assert (request instanceof BookieProtocol.AddRequest);
-        BookieProtocol.AddRequest add = (BookieProtocol.AddRequest) request;
+        assert (request instanceof BookieProtocol.ParsedAddRequest);
+        BookieProtocol.ParsedAddRequest add = (BookieProtocol.ParsedAddRequest) request;
 
         if (requestProcessor.bookie.isReadOnly()) {
             LOG.warn("BookieServer is running in readonly mode,"
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index cf6b315b5..2f85e4f75 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.proto.checksum;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.proto.checksum;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -27,7 +26,7 @@
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
-import org.apache.bookkeeper.util.DoubleByteBuf;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,7 +87,8 @@ public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType
      * @param data
      * @return
      */
-    public ByteBuf computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, ByteBuf data) {
+    public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length,
+            ByteBuf data) {
         ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
         headersBuffer.writeLong(ledgerId);
         headersBuffer.writeLong(entryId);
@@ -99,7 +99,7 @@ public ByteBuf computeDigestAndPackageForSending(long entryId, long lastAddConfi
         update(data);
         populateValueAndReset(headersBuffer);
 
-        return DoubleByteBuf.get(headersBuffer, data);
+        return ByteBufList.get(headersBuffer, data);
     }
 
     /**
@@ -109,7 +109,7 @@ public ByteBuf computeDigestAndPackageForSending(long entryId, long lastAddConfi
      * @return
      */
 
-    public ByteBuf computeDigestAndPackageForSendingLac(long lac) {
+    public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
         ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
         headersBuffer.writeLong(ledgerId);
         headersBuffer.writeLong(lac);
@@ -117,7 +117,7 @@ public ByteBuf computeDigestAndPackageForSendingLac(long lac) {
         update(headersBuffer);
         populateValueAndReset(headersBuffer);
 
-        return headersBuffer;
+        return ByteBufList.get(headersBuffer);
     }
 
     private void verifyDigest(ByteBuf dataReceived) throws BKDigestMatchException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
new file mode 100644
index 000000000..37d4c72c9
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -0,0 +1,304 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
+
+import java.util.ArrayList;
+
+/**
+ * ByteBufList is a holder of a sequence of {@link ByteBuf} objects.
+ *
+ * <p>This class doesn't trying to mimic the {@link ByteBuf}, but rather exposes itself just like a regular object which
+ * will need to be encoded on the channel. There are 2 utility encoders:
+ * <ul>
+ * <li>{@link #ENCODER}: regular encode that will write all the buffers in the {@link ByteBufList} on the channel</li>
+ * <li>{@link #ENCODER_WITH_SIZE}: similar to the previous one, but also prepend a 4 bytes size header, once, carrying
+ * the size of the readable bytes across all the buffers contained in the {@link ByteBufList}</li>
+ * </ul>
+ *
+ * <p>Example:
+ *
+ * <pre>
+ * bootstrap.handler(new ChannelInitializer&lt;SocketChannel&gt;() {
+ *     public void initChannel(SocketChannel ch) throws Exception {
+ *         ChannelPipeline pipeline = ch.pipeline();
+ *         pipeline.addLast("bytebufList", ByteBufList.ENCODER);
+ *         pipeline.addLast("mainhandler", MyHandler.class);
+ *     }
+ * });
+ * </pre>
+ *
+ * <p>ByteBufList is pooling the instances and uses ref-counting to release them.
+ */
+public class ByteBufList extends AbstractReferenceCounted {
+    private final ArrayList<ByteBuf> buffers;
+    private final Handle<ByteBufList> recyclerHandle;
+
+    private static final int INITIAL_LIST_SIZE = 4;
+
+    private static final Recycler<ByteBufList> RECYCLER = new Recycler<ByteBufList>() {
+        @Override
+        protected ByteBufList newObject(Recycler.Handle<ByteBufList> handle) {
+            return new ByteBufList(handle);
+        }
+    };
+
+    private ByteBufList(Handle<ByteBufList> recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+        this.buffers = new ArrayList<>(INITIAL_LIST_SIZE);
+    }
+
+    /**
+     * Get a new {@link ByteBufList} from the pool and assign 2 buffers to it.
+     *
+     * <p>The buffers b1 and b2 lifecycles are now managed by the ByteBufList: when the {@link ByteBufList} is
+     * deallocated, b1 and b2 will be released as well.
+     *
+     * @param b1
+     *            first buffer
+     * @param b2
+     *            second buffer
+     * @return a {@link ByteBufList} instance from the pool
+     */
+    public static ByteBufList get(ByteBuf b1, ByteBuf b2) {
+        ByteBufList buf = get();
+        buf.add(b1);
+        buf.add(b2);
+        return buf;
+    }
+
+    /**
+     * Get a new {@link ByteBufList} from the pool and assign 1 buffer to it.
+     *
+     * <p>The buffer b1 lifecycle is now managed by the ByteBufList: when the {@link ByteBufList} is
+     * deallocated, b1 will be released as well.
+     *
+     * @param b1
+     *            first buffer
+     * @return a {@link ByteBufList} instance from the pool
+     */
+    public static ByteBufList get(ByteBuf b1) {
+        ByteBufList buf = get();
+        buf.add(b1);
+        return buf;
+    }
+
+    /**
+     * Get a new {@link ByteBufList} instance from the pool that is the clone of an already existing instance.
+     */
+    public static ByteBufList clone(ByteBufList other) {
+        ByteBufList buf = get();
+        for (int i = 0; i < other.buffers.size(); i++) {
+            // Create a duplicate of the buffer so that there is no interference from other threads
+            buf.add(other.buffers.get(i).retainedDuplicate());
+        }
+        return buf;
+    }
+
+    private static ByteBufList get() {
+        ByteBufList buf = RECYCLER.get();
+        buf.setRefCnt(1);
+        return buf;
+    }
+
+    /**
+     * Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
+     */
+    public void add(ByteBuf buf) {
+        buffers.add(buf);
+    }
+
+    /**
+     * Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
+     */
+    public void prepend(ByteBuf buf) {
+        buffers.add(0, buf);
+    }
+
+    /**
+     * @return the total amount of readable bytes across all the {@link ByteBuf} included in the list
+     */
+    public int readableBytes() {
+        int readableBytes = 0;
+        for (int i = 0; i < buffers.size(); i++) {
+            readableBytes += buffers.get(i).readableBytes();
+        }
+        return readableBytes;
+    }
+
+    /**
+     * Get access to a particular buffer in the list.
+     *
+     * @param index
+     *            the index of the buffer
+     * @return the buffer
+     */
+    public ByteBuf getBuffer(int index) {
+        return buffers.get(index);
+    }
+
+    /**
+     * @return the number of buffers included in the {@link ByteBufList}
+     */
+    public int size() {
+        return buffers.size();
+    }
+
+    /**
+     * Write bytes from the current {@link ByteBufList} into a byte array.
+     *
+     * <p>This won't modify the reader index of the internal buffers.
+     *
+     * @param dst
+     *            the destination byte array
+     * @return the number of copied bytes
+     */
+    public int getBytes(byte[] dst) {
+        int copied = 0;
+        for (int idx = 0; idx < buffers.size() && copied < dst.length; idx++) {
+            ByteBuf b = buffers.get(idx);
+            int len = Math.min(b.readableBytes(), dst.length - copied);
+            b.getBytes(b.readerIndex(), dst, copied, len);
+
+            copied += len;
+        }
+
+        return copied;
+    }
+
+    /**
+     * @return an array containing all the internal buffers content
+     */
+    public byte[] toArray() {
+        byte[] a = new byte[readableBytes()];
+        getBytes(a);
+        return a;
+    }
+
+    /**
+     * @return a single buffer with the content of both individual buffers
+     */
+    @VisibleForTesting
+    public static ByteBuf coalesce(ByteBufList list) {
+        ByteBuf res = Unpooled.buffer(list.readableBytes());
+        for (int i = 0; i < list.buffers.size(); i++) {
+            ByteBuf b = list.buffers.get(i);
+            res.writeBytes(b, b.readerIndex(), b.readableBytes());
+        }
+
+        return res;
+    }
+
+    @Override
+    public ByteBufList retain() {
+        super.retain();
+        return this;
+    }
+
+    @Override
+    protected void deallocate() {
+        for (int i = 0; i < buffers.size(); i++) {
+            buffers.get(i).release();
+        }
+
+        buffers.clear();
+        recyclerHandle.recycle(this);
+    }
+
+    @Override
+    public ReferenceCounted touch(Object hint) {
+        for (int i = 0; i < buffers.size(); i++) {
+            buffers.get(i).touch(hint);
+        }
+        return this;
+    }
+
+    /**
+     * Encoder for the {@link ByteBufList} that doesn't prepend any size header.
+     */
+    public static final Encoder ENCODER = new Encoder(false);
+
+    /**
+     * Encoder for the {@link ByteBufList} that will prepend a 4 byte header with the size of the whole
+     * {@link ByteBufList} readable bytes.
+     */
+    public static final Encoder ENCODER_WITH_SIZE = new Encoder(true);
+
+    /**
+     * {@link ByteBufList} encoder.
+     */
+    @Sharable
+    public static class Encoder extends ChannelOutboundHandlerAdapter {
+
+        private final boolean prependSize;
+
+        public Encoder(boolean prependSize) {
+            this.prependSize = prependSize;
+        }
+
+        @Override
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+            if (msg instanceof ByteBufList) {
+                ByteBufList b = (ByteBufList) msg;
+
+                try {
+                    if (prependSize) {
+                        // Prepend the frame size before writing the buffer list, so that we only have 1 single size
+                        // header
+                        ByteBuf sizeBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(4, 4);
+                        sizeBuffer.writeInt(b.readableBytes());
+                        ctx.write(sizeBuffer, ctx.voidPromise());
+                    }
+
+                    // Write each buffer individually on the socket. The retain() here is needed to preserve the fact
+                    // that ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased
+                    // and it gets written multiple times, the individual buffers refcount should be reflected as well.
+                    int buffersCount = b.buffers.size();
+                    for (int i = 0; i < buffersCount; i++) {
+                        ByteBuf bx = b.buffers.get(i);
+                        // Last buffer will carry on the final promise to notify when everything was written on the
+                        // socket
+                        ctx.write(bx.retainedDuplicate(), i == (buffersCount - 1) ? promise : ctx.voidPromise());
+                    }
+                } finally {
+                    ReferenceCountUtil.safeRelease(b);
+                }
+            } else {
+                ctx.write(msg, promise);
+            }
+        }
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
deleted file mode 100644
index 7a953685b..000000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.bookkeeper.util;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
-
-/**
- * Removed custom implementation of DoubleByteBuf, just relying on straight regular CompositeByteBuf.
- */
-public class DoubleByteBuf extends CompositeByteBuf {
-
-    public DoubleByteBuf(ByteBufAllocator alloc) {
-        super(alloc, true, 2);
-    }
-
-    public static DoubleByteBuf get(ByteBuf b1, ByteBuf b2) {
-        DoubleByteBuf cbb = new DoubleByteBuf(b1.alloc());
-        cbb.addComponent(true, b1);
-        cbb.addComponent(true, b2);
-        return cbb;
-    }
-}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index 25842f1da..8e2dd7dd4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -17,11 +17,14 @@
  */
 package org.apache.bookkeeper.client;
 
-import static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType.CRC32;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+
 import java.security.GeneralSecurityException;
+
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.util.ByteBufList;
 
 /**
  * Client utilities.
@@ -32,11 +35,11 @@ public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddCo
         return generatePacket(ledgerId, entryId, lastAddConfirmed, length, data, 0, data.length);
     }
 
-    public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed, long length,
-                                         byte[] data, int offset, int len) throws GeneralSecurityException {
-        DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], CRC32);
-        return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
-                                                    Unpooled.wrappedBuffer(data, offset, len));
+    public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed, long length, byte[] data,
+            int offset, int len) throws GeneralSecurityException {
+        DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32);
+        return ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
+                Unpooled.wrappedBuffer(data, offset, len)));
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
index 2c3fb7d9f..7098f4ee4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
@@ -19,6 +19,10 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.ByteBuf;
+
+import org.apache.bookkeeper.util.ByteBufList;
+
 /**
  * Adapter for tests to get the public access from LedgerHandle for its default
  * scope.
@@ -31,4 +35,9 @@
     public static LedgerMetadata getLedgerMetadata(LedgerHandle lh) {
         return lh.getLedgerMetadata();
     }
+
+    public static ByteBufList toSend(LedgerHandle lh, long entryId, ByteBuf data) {
+        return lh.getDigestManager().computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(),
+                lh.addToLength(data.readableBytes()), data);
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 9a89c7056..f17764857 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -60,6 +60,7 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
@@ -416,9 +417,10 @@ protected void setupBookieClientReadEntry() {
                 if (mockEntry != null) {
                     LOG.info("readEntryAndFenceLedger - found mock entry {}@{} at {}", entryId, ledgerId,
                             bookieSocketAddress);
-                    ByteBuf entry = macManager.computeDigestAndPackageForSending(entryId, mockEntry.lastAddConfirmed,
-                        mockEntry.payload.length, Unpooled.wrappedBuffer(mockEntry.payload));
-                    callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, Unpooled.copiedBuffer(entry),
+                    ByteBufList entry = macManager.computeDigestAndPackageForSending(entryId,
+                            mockEntry.lastAddConfirmed, mockEntry.payload.length,
+                            Unpooled.wrappedBuffer(mockEntry.payload));
+                    callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, ByteBufList.coalesce(entry),
                             args[5]);
                     entry.release();
                 } else {
@@ -456,10 +458,10 @@ protected void setupBookieClientReadEntry() {
                 }
                 if (mockEntry != null) {
                     LOG.info("readEntry - found mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress);
-                    ByteBuf entry = macManager.computeDigestAndPackageForSending(entryId,
+                    ByteBufList entry = macManager.computeDigestAndPackageForSending(entryId,
                         mockEntry.lastAddConfirmed, mockEntry.payload.length,
                         Unpooled.wrappedBuffer(mockEntry.payload));
-                    callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, Unpooled.copiedBuffer(entry),
+                    callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, ByteBufList.coalesce(entry),
                             args[4]);
                     entry.release();
                 } else {
@@ -532,7 +534,7 @@ protected void setupBookieClientAddEntry() {
             return null;
         }).when(bookieClient).addEntry(any(BookieSocketAddress.class),
             anyLong(), any(byte[].class),
-            anyLong(), any(ByteBuf.class),
+            anyLong(), any(ByteBufList.class),
             any(BookkeeperInternalCallbacks.WriteCallback.class),
             any(), anyInt());
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 8e879218c..8c4953136 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -54,6 +54,7 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
@@ -360,7 +361,7 @@ public void testRecoveryOnEntryGap() throws Exception {
         long entryId = 14;
         long lac = 8;
         byte[] data = "recovery-on-entry-gap-gap".getBytes(UTF_8);
-        ByteBuf toSend =
+        ByteBufList toSend =
                 lh.macManager.computeDigestAndPackageForSending(
                         entryId, lac, lh.getLength() + 100, Unpooled.wrappedBuffer(data, 0, data.length));
         final CountDownLatch addLatch = new CountDownLatch(1);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 1e0750f17..4cf25912d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -52,6 +52,7 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
@@ -156,7 +157,7 @@ public void testWriteGaps() throws Exception {
 
         BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
                                            scheduler, NullStatsLogger.INSTANCE);
-        ByteBuf bb = createByteBuffer(1, 1, 1);
+        ByteBufList bb = createByteBuffer(1, 1, 1);
         bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE);
         synchronized (arc) {
             arc.wait(1000);
@@ -242,13 +243,13 @@ public void testWriteGaps() throws Exception {
         }
     }
 
-    private ByteBuf createByteBuffer(int i, long lid, long eid) {
+    private ByteBufList createByteBuffer(int i, long lid, long eid) {
         ByteBuf bb = Unpooled.buffer(4 + 24);
         bb.writeLong(lid);
         bb.writeLong(eid);
         bb.writeLong(eid - 1);
         bb.writeInt(i);
-        return bb;
+        return ByteBufList.get(bb);
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
new file mode 100644
index 000000000..19c841be4
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.EventExecutor;
+
+import java.net.SocketAddress;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link ByteBufList}.
+ */
+public class ByteBufListTest {
+    @Test
+    public void testSingle() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBufList buf = ByteBufList.get(b1);
+
+        assertEquals(1, buf.size());
+        assertEquals(128, buf.readableBytes());
+        assertEquals(b1, buf.getBuffer(0));
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+    }
+
+    @Test
+    public void testDouble() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b2.writerIndex(b2.capacity());
+        ByteBufList buf = ByteBufList.get(b1, b2);
+
+        assertEquals(2, buf.size());
+        assertEquals(256, buf.readableBytes());
+        assertEquals(b1, buf.getBuffer(0));
+        assertEquals(b2, buf.getBuffer(1));
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+        assertEquals(b2.refCnt(), 1);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+        assertEquals(b2.refCnt(), 0);
+    }
+
+    @Test
+    public void testClone() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b2.writerIndex(b2.capacity());
+        ByteBufList buf = ByteBufList.get(b1, b2);
+
+        ByteBufList clone = ByteBufList.clone(buf);
+
+        assertEquals(2, buf.size());
+        assertEquals(256, buf.readableBytes());
+        assertEquals(b1, buf.getBuffer(0));
+        assertEquals(b2, buf.getBuffer(1));
+
+        assertEquals(2, clone.size());
+        assertEquals(256, clone.readableBytes());
+        assertEquals(b1, clone.getBuffer(0));
+        assertEquals(b2, clone.getBuffer(1));
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(clone.refCnt(), 1);
+        assertEquals(b1.refCnt(), 2);
+        assertEquals(b2.refCnt(), 2);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(clone.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+        assertEquals(b2.refCnt(), 1);
+
+        clone.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(clone.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+        assertEquals(b2.refCnt(), 0);
+    }
+
+    @Test
+    public void testGetBytes() throws Exception {
+        ByteBufList buf = ByteBufList.get(Unpooled.wrappedBuffer("hello".getBytes()),
+                Unpooled.wrappedBuffer("world".getBytes()));
+
+        assertArrayEquals("helloworld".getBytes(), buf.toArray());
+
+        buf.prepend(Unpooled.wrappedBuffer("prefix-".getBytes()));
+        assertArrayEquals("prefix-helloworld".getBytes(), buf.toArray());
+
+        // Bigger buffer
+        byte[] buf100 = new byte[100];
+        int res = buf.getBytes(buf100);
+
+        assertEquals("prefix-helloworld".length(), res);
+
+        // Smaller buffer
+        byte[] buf4 = new byte[4];
+        res = buf.getBytes(buf4);
+
+        assertEquals(4, res);
+        assertEquals("pref", new String(buf4));
+    }
+
+    @Test
+    public void testCoalesce() throws Exception {
+        ByteBufList buf = ByteBufList.get(Unpooled.wrappedBuffer("hello".getBytes()),
+                Unpooled.wrappedBuffer("world".getBytes()));
+
+        assertEquals(Unpooled.wrappedBuffer("helloworld".getBytes()), ByteBufList.coalesce(buf));
+    }
+
+    @Test
+    public void testRetain() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBufList buf = ByteBufList.get(b1);
+
+        assertEquals(1, buf.size());
+        assertEquals(128, buf.readableBytes());
+        assertEquals(b1, buf.getBuffer(0));
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+
+        buf.retain();
+
+        assertEquals(buf.refCnt(), 2);
+        assertEquals(b1.refCnt(), 1);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+
+        buf.release();
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+    }
+
+    @Test
+    public void testEncoder() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b2.writerIndex(b2.capacity());
+        ByteBufList buf = ByteBufList.get(b1, b2);
+
+        ChannelHandlerContext ctx = new MockChannelHandlerContext();
+
+        ByteBufList.ENCODER.write(ctx, buf, null);
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+        assertEquals(b2.refCnt(), 0);
+    }
+
+    class MockChannelHandlerContext implements ChannelHandlerContext {
+        @Override
+        public ChannelFuture bind(SocketAddress localAddress) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture disconnect() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture close() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture deregister() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture disconnect(ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture close(ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture deregister(ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture write(Object msg) {
+            ReferenceCountUtil.safeRelease(msg);
+            return null;
+        }
+
+        @Override
+        public ChannelFuture write(Object msg, ChannelPromise promise) {
+            ReferenceCountUtil.safeRelease(msg);
+            return null;
+        }
+
+        @Override
+        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+            ReferenceCountUtil.safeRelease(msg);
+            return null;
+        }
+
+        @Override
+        public ChannelFuture writeAndFlush(Object msg) {
+            ReferenceCountUtil.safeRelease(msg);
+            return null;
+        }
+
+        @Override
+        public ChannelPromise newPromise() {
+            return null;
+        }
+
+        @Override
+        public ChannelProgressivePromise newProgressivePromise() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture newSucceededFuture() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture newFailedFuture(Throwable cause) {
+            return null;
+        }
+
+        @Override
+        public ChannelPromise voidPromise() {
+            return null;
+        }
+
+        @Override
+        public Channel channel() {
+            return null;
+        }
+
+        @Override
+        public EventExecutor executor() {
+            return null;
+        }
+
+        @Override
+        public String name() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandler handler() {
+            return null;
+        }
+
+        @Override
+        public boolean isRemoved() {
+            return false;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelRegistered() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelUnregistered() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelActive() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelInactive() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireUserEventTriggered(Object evt) {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelRead(Object msg) {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelReadComplete() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext fireChannelWritabilityChanged() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext read() {
+            return null;
+        }
+
+        @Override
+        public ChannelHandlerContext flush() {
+            return null;
+        }
+
+        @Override
+        public ChannelPipeline pipeline() {
+            return null;
+        }
+
+        @Override
+        public ByteBufAllocator alloc() {
+            return null;
+        }
+
+        @Override
+        @Deprecated
+        public <T> Attribute<T> attr(AttributeKey<T> key) {
+            return null;
+        }
+
+        @Override
+        @Deprecated
+        public <T> boolean hasAttr(AttributeKey<T> key) {
+            return false;
+        }
+
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
deleted file mode 100644
index 82e706f3b..000000000
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.bookkeeper.util;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-
-/**
- * Test the Double byte buffer.
- */
-public class DoubleByteBufTest {
-
-    @Test
-    public void testGetBytes() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 4, 5, 6 });
-        doTest(b1, b2);
-    }
-
-    @Test
-    public void testGetBytesWithDoubleByteBufAssource() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
-        ByteBuf b3 = Unpooled.wrappedBuffer(new byte[] { 5, 6 });
-
-        ByteBuf b23 = DoubleByteBuf.get(b2, b3);
-        doTest(b1, b23);
-    }
-
-    @Test
-    public void testGetBytesWithIndex() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 9, 9, 4, 5, 6 });
-
-        // Skip the two '9' from b2
-        b2.readByte();
-        b2.readByte();
-
-        doTest(b1, b2);
-    }
-
-    private void doTest(ByteBuf b1, ByteBuf b2) {
-        ByteBuf buf = DoubleByteBuf.get(b1, b2);
-
-        assertEquals(6, buf.readableBytes());
-        assertEquals(0, buf.writableBytes());
-
-        ByteBuf dst1 = Unpooled.buffer(6);
-        buf.getBytes(0, dst1);
-        assertEquals(6, dst1.readableBytes());
-        assertEquals(0, dst1.writableBytes());
-        assertEquals(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6 }), dst1);
-
-        ByteBuf dst2 = Unpooled.buffer(6);
-        buf.getBytes(0, dst2, 4);
-        assertEquals(4, dst2.readableBytes());
-        assertEquals(2, dst2.writableBytes());
-        assertEquals(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3, 4 }), dst2);
-
-        ByteBuf dst3 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
-        buf.getBytes(0, dst3, 1, 4);
-        assertEquals(6, dst3.readableBytes());
-        assertEquals(0, dst3.writableBytes());
-        assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 0 }), dst3);
-
-        ByteBuf dst4 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
-        buf.getBytes(2, dst4, 1, 3);
-        assertEquals(6, dst4.readableBytes());
-        assertEquals(0, dst4.writableBytes());
-        assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 3, 4, 5, 0, 0 }), dst4);
-
-        ByteBuf dst5 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
-        buf.getBytes(3, dst5, 1, 3);
-        assertEquals(6, dst5.readableBytes());
-        assertEquals(0, dst5.writableBytes());
-        assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 4, 5, 6, 0, 0 }), dst5);
-    }
-
-    @Test
-    public void testCopyToArray() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-
-        byte[] a1 = new byte[4];
-        b.getBytes(0, a1);
-        assertArrayEquals(new byte[] { 1, 2, 3, 4 }, a1);
-
-        byte[] a2 = new byte[3];
-        b.getBytes(1, a2);
-        assertArrayEquals(new byte[] { 2, 3, 4 }, a2);
-    }
-
-    @Test
-    public void testToByteBuffer() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-
-        assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }), b.nioBuffer());
-    }
-
-    @Test
-    public void testNonDirectNioBuffer() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-        assertFalse(b1.isDirect());
-        assertFalse(b2.isDirect());
-        assertFalse(b.isDirect());
-        ByteBuffer nioBuffer = b.nioBuffer();
-        assertFalse(nioBuffer.isDirect());
-    }
-
-    @Test
-    public void testNonDirectPlusDirectNioBuffer() {
-        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b2 = Unpooled.directBuffer(2);
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-        assertFalse(b1.isDirect());
-        assertTrue(b2.isDirect());
-        assertFalse(b.isDirect());
-        ByteBuffer nioBuffer = b.nioBuffer();
-        assertFalse(nioBuffer.isDirect());
-    }
-
-    @Test
-    public void testDirectPlusNonDirectNioBuffer() {
-        ByteBuf b1 = Unpooled.directBuffer(2);
-        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-        assertTrue(b1.isDirect());
-        assertFalse(b2.isDirect());
-        assertFalse(b.isDirect());
-        ByteBuffer nioBuffer = b.nioBuffer();
-        assertFalse(nioBuffer.isDirect());
-    }
-
-    @Test
-    public void testDirectNioBuffer() {
-        ByteBuf b1 = Unpooled.directBuffer(2);
-        ByteBuf b2 = Unpooled.directBuffer(2);
-        ByteBuf b = DoubleByteBuf.get(b1, b2);
-        assertTrue(b1.isDirect());
-        assertTrue(b2.isDirect());
-        assertTrue(b.isDirect());
-    }
-
-    /**
-     * Verify that readableBytes() returns writerIndex - readerIndex. In this case writerIndex is the end of the buffer
-     * and readerIndex is increased by 64.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testReadableBytes() throws Exception {
-        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
-        b1.writerIndex(b1.capacity());
-        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
-        b2.writerIndex(b2.capacity());
-        ByteBuf buf = DoubleByteBuf.get(b1, b2);
-
-        assertEquals(buf.readerIndex(), 0);
-        assertEquals(buf.writerIndex(), 256);
-        assertEquals(buf.readableBytes(), 256);
-
-        for (int i = 0; i < 4; ++i) {
-            buf.skipBytes(64);
-            assertEquals(buf.readableBytes(), 256 - 64 * (i + 1));
-        }
-    }
-}
diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
index 339f5d236..3995ea887 100644
--- a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
+++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
@@ -23,11 +23,11 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
-import org.apache.bookkeeper.util.DoubleByteBuf;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -97,7 +97,7 @@
         private DigestManager mac;
 
         private ByteBuf arrayBackedBuffer;
-        private ByteBuf notArrayBackedBuffer;
+        private CompositeByteBuf notArrayBackedBuffer;
         private ByteBuf byteBufDefaultAlloc;
 
         public ByteBuf digestBuf;
@@ -119,8 +119,9 @@ public void doSetup() throws Exception {
             arrayBackedBuffer = Unpooled.wrappedBuffer(randomBytes(entrySize));
 
             final int headerSize = 32 + getDigestManager(digest).getMacCodeLength();
-            notArrayBackedBuffer = DoubleByteBuf.get(Unpooled.wrappedBuffer(randomBytes(headerSize)),
-                    Unpooled.wrappedBuffer((randomBytes(entrySize - headerSize))));
+            notArrayBackedBuffer = new CompositeByteBuf(ByteBufAllocator.DEFAULT, true, 2);
+            notArrayBackedBuffer.addComponent(Unpooled.wrappedBuffer(randomBytes(headerSize)));
+            notArrayBackedBuffer.addComponent(Unpooled.wrappedBuffer((randomBytes(entrySize - headerSize))));
 
             byteBufDefaultAlloc = ByteBufAllocator.DEFAULT.buffer(entrySize, entrySize);
             byteBufDefaultAlloc.writeBytes(randomBytes(entrySize));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services