You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/12/09 02:19:38 UTC

[GitHub] merlimat closed pull request #824: Refactored EntryLogger Buffered channel

merlimat closed pull request #824: Refactored EntryLogger Buffered channel
URL: https://github.com/apache/bookkeeper/pull/824
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index fa4d41006..1d19a3024 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -23,6 +23,10 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractFuture;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -52,6 +56,7 @@
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
@@ -102,6 +107,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Bookie Shell is to provide utilities for users to administer a bookkeeper cluster.
  */
@@ -2449,7 +2455,7 @@ public boolean accept(long ledgerId) {
                 return true;
             }
             @Override
-            public void process(long ledgerId, long startPos, ByteBuffer entry) {
+            public void process(long ledgerId, long startPos, ByteBuf entry) {
                 formatEntry(startPos, entry, printMsg);
             }
         });
@@ -2476,10 +2482,9 @@ public boolean accept(long ledgerId) {
             }
 
             @Override
-            public void process(long ledgerId, long startPos, ByteBuffer entry) {
-                long entrysLedgerId = entry.getLong();
-                long entrysEntryId = entry.getLong();
-                entry.rewind();
+            public void process(long ledgerId, long startPos, ByteBuf entry) {
+                long entrysLedgerId = entry.getLong(entry.readerIndex());
+                long entrysEntryId = entry.getLong(entry.readerIndex() + 8);
                 if ((ledgerId == entrysLedgerId) && ((entrysEntryId == entryId)) || (entryId == -1)) {
                     entryFound.setValue(true);
                     formatEntry(startPos, entry, printMsg);
@@ -2515,12 +2520,12 @@ public boolean accept(long ledgerId) {
             }
 
             @Override
-            public void process(long ledgerId, long entryStartPos, ByteBuffer entry) {
+            public void process(long ledgerId, long entryStartPos, ByteBuf entry) {
                 if (!stopScanning.booleanValue()) {
                     if ((rangeEndPos != -1) && (entryStartPos > rangeEndPos)) {
                         stopScanning.setValue(true);
                     } else {
-                        int entrySize = entry.limit();
+                        int entrySize = entry.readableBytes();
                         /**
                          * entrySize of an entry (inclusive of payload and
                          * header) value is stored as int value in log file, but
@@ -2562,7 +2567,7 @@ public void process(int journalVersion, long offset, ByteBuffer entry) throws IO
                     System.out.println("Journal Version : " + journalVersion);
                     printJournalVersion = true;
                 }
-                formatEntry(offset, entry, printMsg);
+                formatEntry(offset, Unpooled.wrappedBuffer(entry), printMsg);
             }
         });
     }
@@ -2608,17 +2613,17 @@ private void formatEntry(LedgerEntry entry, boolean printMsg) {
      * @param printMsg
      *          Whether printing the message body
      */
-    private void formatEntry(long pos, ByteBuffer recBuff, boolean printMsg) {
-        long ledgerId = recBuff.getLong();
-        long entryId = recBuff.getLong();
-        int entrySize = recBuff.limit();
+    private void formatEntry(long pos, ByteBuf recBuff, boolean printMsg) {
+        int entrySize = recBuff.readableBytes();
+        long ledgerId = recBuff.readLong();
+        long entryId = recBuff.readLong();
 
         System.out.println("--------- Lid=" + ledgerId + ", Eid=" + entryId
                          + ", ByteOffset=" + pos + ", EntrySize=" + entrySize + " ---------");
         if (entryId == Bookie.METAENTRY_ID_LEDGER_KEY) {
-            int masterKeyLen = recBuff.getInt();
+            int masterKeyLen = recBuff.readInt();
             byte[] masterKey = new byte[masterKeyLen];
-            recBuff.get(masterKey);
+            recBuff.readBytes(masterKey);
             System.out.println("Type:           META");
             System.out.println("MasterKey:      " + bytes2Hex(masterKey));
             System.out.println();
@@ -2631,7 +2636,7 @@ private void formatEntry(long pos, ByteBuffer recBuff, boolean printMsg) {
             return;
         }
         // process a data entry
-        long lastAddConfirmed = recBuff.getLong();
+        long lastAddConfirmed = recBuff.readLong();
         System.out.println("Type:           DATA");
         System.out.println("LastConfirmed:  " + lastAddConfirmed);
         if (!printMsg) {
@@ -2639,12 +2644,12 @@ private void formatEntry(long pos, ByteBuffer recBuff, boolean printMsg) {
             return;
         }
         // skip digest checking
-        recBuff.position(32 + 8);
+        recBuff.skipBytes(8);
         System.out.println("Data:");
         System.out.println();
         try {
-            byte[] ret = new byte[recBuff.remaining()];
-            recBuff.get(ret);
+            byte[] ret = new byte[recBuff.readableBytes()];
+            recBuff.readBytes(ret);
             formatter.formatEntry(ret);
         } catch (Exception e) {
             System.out.println("N/A. Corrupted.");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 10affaeba..0d21d415c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -21,22 +21,25 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.bookkeeper.util.ZeroBuffer;
 
 /**
  * Provides a buffering layer in front of a FileChannel.
  */
-public class BufferedChannel extends BufferedReadChannel {
+public class BufferedChannel extends BufferedReadChannel implements Closeable {
     // The capacity of the write buffer.
     protected final int writeCapacity;
     // The position of the file channel's write pointer.
     protected AtomicLong writeBufferStartPosition = new AtomicLong(0);
     // The buffer used to write operations.
-    protected final ByteBuffer writeBuffer;
+    protected final ByteBuf writeBuffer;
     // The absolute position of the next write operation.
     protected volatile long position;
 
@@ -48,12 +51,15 @@ public BufferedChannel(FileChannel fc, int capacity) throws IOException {
 
     public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
         super(fc, readCapacity);
-        // Set the read buffer's limit to readCapacity.
-        this.readBuffer.limit(readCapacity);
         this.writeCapacity = writeCapacity;
         this.position = fc.position();
         this.writeBufferStartPosition.set(position);
-        this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
+        this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
+    }
+
+    @Override
+    public void close() throws IOException {
+        writeBuffer.release();
     }
 
     /**
@@ -64,19 +70,16 @@ public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) thro
      * @param src The source ByteBuffer which contains the data to be written.
      * @throws IOException if a write operation fails.
      */
-    public synchronized void write(ByteBuffer src) throws IOException {
+    public synchronized void write(ByteBuf src) throws IOException {
         int copied = 0;
-        while (src.remaining() > 0) {
-            int truncated = 0;
-            if (writeBuffer.remaining() < src.remaining()) {
-                truncated = src.remaining() - writeBuffer.remaining();
-                src.limit(src.limit() - truncated);
-            }
-            copied += src.remaining();
-            writeBuffer.put(src);
-            src.limit(src.limit() + truncated);
+        int len = src.readableBytes();
+        while (copied < len) {
+            int bytesToCopy = Math.min(src.readableBytes() - copied, writeBuffer.writableBytes());
+            writeBuffer.writeBytes(src, src.readerIndex() + copied, bytesToCopy);
+            copied += bytesToCopy;
+
             // if we have run out of buffer space, we should flush to the file
-            if (writeBuffer.remaining() == 0) {
+            if (!writeBuffer.isWritable()) {
                 flushInternal();
             }
         }
@@ -121,10 +124,10 @@ public void flush(boolean shouldForceWrite) throws IOException {
      * @throws IOException if the write fails.
      */
     private void flushInternal() throws IOException {
-        writeBuffer.flip();
+        ByteBuffer toWrite = writeBuffer.internalNioBuffer(0, writeBuffer.writerIndex());
         do {
-            fileChannel.write(writeBuffer);
-        } while (writeBuffer.hasRemaining());
+            fileChannel.write(toWrite);
+        } while (toWrite.hasRemaining());
         writeBuffer.clear();
         writeBufferStartPosition.set(fileChannel.position());
     }
@@ -140,57 +143,41 @@ public long forceWrite(boolean forceMetadata) throws IOException {
     }
 
     @Override
-    public synchronized int read(ByteBuffer dest, long pos) throws IOException {
+    public synchronized int read(ByteBuf dest, long pos, int length) throws IOException {
         long prevPos = pos;
-        while (dest.remaining() > 0) {
+        while (length > 0) {
             // check if it is in the write buffer
             if (writeBuffer != null && writeBufferStartPosition.get() <= pos) {
-                long positionInBuffer = pos - writeBufferStartPosition.get();
-                long bytesToCopy = writeBuffer.position() - positionInBuffer;
-                if (bytesToCopy > dest.remaining()) {
-                    bytesToCopy = dest.remaining();
-                }
+                int positionInBuffer = (int) (pos - writeBufferStartPosition.get());
+                int bytesToCopy = Math.min(writeBuffer.writerIndex() - positionInBuffer, dest.writableBytes());
+
                 if (bytesToCopy == 0) {
                     throw new IOException("Read past EOF");
                 }
-                ByteBuffer src = writeBuffer.duplicate();
-                src.position((int) positionInBuffer);
-                src.limit((int) (positionInBuffer + bytesToCopy));
-                dest.put(src);
+
+                dest.writeBytes(writeBuffer, positionInBuffer, bytesToCopy);
                 pos += bytesToCopy;
+                length -= bytesToCopy;
             } else if (writeBuffer == null && writeBufferStartPosition.get() <= pos) {
                 // here we reach the end
                 break;
                 // first check if there is anything we can grab from the readBuffer
-            } else if (readBufferStartPosition <= pos && pos < readBufferStartPosition + readBuffer.capacity()) {
-                long positionInBuffer = pos - readBufferStartPosition;
-                long bytesToCopy = readBuffer.capacity() - positionInBuffer;
-                if (bytesToCopy > dest.remaining()) {
-                    bytesToCopy = dest.remaining();
-                }
-                ByteBuffer src = readBuffer.duplicate();
-                src.position((int) positionInBuffer);
-                src.limit((int) (positionInBuffer + bytesToCopy));
-                dest.put(src);
+            } else if (readBufferStartPosition <= pos && pos < readBufferStartPosition + readBuffer.writerIndex()) {
+                int positionInBuffer = (int) (pos - readBufferStartPosition);
+                int bytesToCopy = Math.min(readBuffer.writerIndex() - positionInBuffer, dest.writableBytes());
+                dest.writeBytes(readBuffer, positionInBuffer, bytesToCopy);
                 pos += bytesToCopy;
+                length -= bytesToCopy;
                 // let's read it
             } else {
                 readBufferStartPosition = pos;
-                readBuffer.clear();
-                // make sure that we don't overlap with the write buffer
-                if (readBufferStartPosition + readBuffer.capacity() >= writeBufferStartPosition.get()) {
-                    readBufferStartPosition = writeBufferStartPosition.get() - readBuffer.capacity();
-                    if (readBufferStartPosition < 0) {
-                        ZeroBuffer.put(readBuffer, (int) -readBufferStartPosition);
-                    }
-                }
-                while (readBuffer.remaining() > 0) {
-                    if (fileChannel.read(readBuffer, readBufferStartPosition + readBuffer.position()) <= 0) {
-                        throw new IOException("Short read");
-                    }
+
+                int readBytes = fileChannel.read(readBuffer.internalNioBuffer(0, readCapacity),
+                        readBufferStartPosition);
+                if (readBytes <= 0) {
+                    throw new IOException("Reading from filechannel returned a non-positive value. Short read.");
                 }
-                ZeroBuffer.put(readBuffer);
-                readBuffer.clear();
+                readBuffer.writerIndex(readBytes);
             }
         }
         return (int) (pos - prevPos);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
index ef4d6f31c..87e1d4355 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
@@ -21,7 +21,7 @@
 import java.nio.channels.FileChannel;
 
 /**
- * A {@code BufferedChannelBase} adds functionlity to an existing file channel, the ability
+ * A {@code BufferedChannelBase} adds functionality to an existing file channel, the ability
  * to buffer the input and output data. This class is a base class for wrapping the {@link FileChannel}.
  */
 public abstract class BufferedChannelBase {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
index 64557d120..96dea6f67 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
@@ -21,19 +21,24 @@
 
 package org.apache.bookkeeper.bookie;
 
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
 /**
  * A Buffered channel without a write buffer. Only reads are buffered.
  */
-public class BufferedReadChannel extends BufferedChannelBase {
+public class BufferedReadChannel extends BufferedChannelBase  {
 
     // The capacity of the read buffer.
     protected final int readCapacity;
+
     // The buffer for read operations.
-    protected ByteBuffer readBuffer;
+    protected final ByteBuf readBuffer;
+
     // The starting position of the data currently in the read buffer.
     protected long readBufferStartPosition = Long.MIN_VALUE;
 
@@ -43,8 +48,7 @@
     public BufferedReadChannel(FileChannel fileChannel, int readCapacity) throws IOException {
         super(fileChannel);
         this.readCapacity = readCapacity;
-        this.readBuffer = ByteBuffer.allocateDirect(readCapacity);
-        this.readBuffer.limit(0);
+        this.readBuffer = Unpooled.buffer(readCapacity);
     }
 
     /**
@@ -57,7 +61,11 @@ public BufferedReadChannel(FileChannel fileChannel, int readCapacity) throws IOE
      *         -1 if the given position is greater than or equal to the file's current size.
      * @throws IOException if I/O error occurs
      */
-    public synchronized int read(ByteBuffer dest, long pos) throws IOException {
+    public int read(ByteBuf dest, long pos) throws IOException {
+        return read(dest, pos, dest.writableBytes());
+    }
+
+    public synchronized int read(ByteBuf dest, long pos, int length) throws IOException {
         invocationCount++;
         long currentPosition = pos;
         long eof = validateAndGetFileChannel().size();
@@ -65,30 +73,28 @@ public synchronized int read(ByteBuffer dest, long pos) throws IOException {
         if (pos >= eof) {
             return -1;
         }
-        while (dest.remaining() > 0) {
+        while (length > 0) {
             // Check if the data is in the buffer, if so, copy it.
             if (readBufferStartPosition <= currentPosition
-                    && currentPosition < readBufferStartPosition + readBuffer.limit()) {
-                long posInBuffer = currentPosition - readBufferStartPosition;
-                long bytesToCopy = Math.min(dest.remaining(), readBuffer.limit() - posInBuffer);
-                ByteBuffer rbDup = readBuffer.duplicate();
-                rbDup.position((int) posInBuffer);
-                rbDup.limit((int) (posInBuffer + bytesToCopy));
-                dest.put(rbDup);
+                    && currentPosition < readBufferStartPosition + readBuffer.readableBytes()) {
+                int posInBuffer = (int) (currentPosition - readBufferStartPosition);
+                int bytesToCopy = Math.min(length, readBuffer.readableBytes() - posInBuffer);
+                dest.writeBytes(readBuffer, posInBuffer, bytesToCopy);
                 currentPosition += bytesToCopy;
+                length -= bytesToCopy;
                 cacheHitCount++;
             } else if (currentPosition >= eof) {
                 // here we reached eof.
                 break;
             } else {
                 // We don't have it in the buffer, so put necessary data in the buffer
-                readBuffer.clear();
                 readBufferStartPosition = currentPosition;
                 int readBytes = 0;
-                if ((readBytes = validateAndGetFileChannel().read(readBuffer, currentPosition)) <= 0) {
+                if ((readBytes = validateAndGetFileChannel().read(readBuffer.internalNioBuffer(0, readCapacity),
+                        currentPosition)) <= 0) {
                     throw new IOException("Reading from filechannel returned a non-positive value. Short read.");
                 }
-                readBuffer.limit(readBytes);
+                readBuffer.writerIndex(readBytes);
             }
         }
         return (int) (currentPosition - pos);
@@ -96,7 +102,6 @@ public synchronized int read(ByteBuffer dest, long pos) throws IOException {
 
     public synchronized void clear() {
         readBuffer.clear();
-        readBuffer.limit(0);
     }
 
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
index c31b98962..88e22bf67 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
@@ -21,8 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -85,15 +86,13 @@ public boolean accept(long ledgerId) {
                 }
 
                 @Override
-                public void process(final long ledgerId, long offset, ByteBuffer entry) throws IOException {
-                    throttler.acquire(entry.remaining());
+                public void process(final long ledgerId, long offset, ByteBuf entry) throws IOException {
+                    throttler.acquire(entry.readableBytes());
 
                     if (offsets.size() > maxOutstandingRequests) {
                         flush();
                     }
-                    entry.getLong(); // discard ledger id, we already have it
-                    long entryId = entry.getLong();
-                    entry.rewind();
+                    long entryId = entry.getLong(entry.readerIndex() + 8);
 
                     long newoffset = entryLogger.addEntry(ledgerId, entry);
                     offsets.add(new EntryLocation(ledgerId, entryId, newoffset));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index c00c84c59..d2291513d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -28,9 +28,10 @@
 import com.google.common.collect.MapMaker;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.FastThreadLocal;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -60,7 +61,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -164,7 +164,7 @@ public ConcurrentLongLongHashMap getLedgersMap() {
      * </pre>
      */
     static final int LOGFILE_HEADER_SIZE = 1024;
-    final ByteBuffer logfileHeader = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
+    final ByteBuf logfileHeader = Unpooled.buffer(LOGFILE_HEADER_SIZE);
 
     static final int HEADER_VERSION_POSITION = 4;
     static final int LEDGERS_MAP_OFFSET_POSITION = HEADER_VERSION_POSITION + 4;
@@ -222,10 +222,10 @@ public ConcurrentLongLongHashMap getLedgersMap() {
          * @param offset
          *          File offset of this entry.
          * @param entry
-         *          Entry ByteBuffer
+         *          Entry ByteBuf
          * @throws IOException
          */
-        void process(long ledgerId, long offset, ByteBuffer entry) throws IOException;
+        void process(long ledgerId, long offset, ByteBuf entry) throws IOException;
     }
 
     /**
@@ -265,8 +265,9 @@ public EntryLogger(ServerConfiguration conf,
         // within the same JVM. All of these Bookie instances access this header
         // so there can be race conditions when entry logs are rolled over and
         // this header buffer is cleared before writing it into the new logChannel.
-        logfileHeader.put("BKLO".getBytes(UTF_8));
-        logfileHeader.putInt(HEADER_CURRENT_VERSION);
+        logfileHeader.writeBytes("BKLO".getBytes(UTF_8));
+        logfileHeader.writeInt(HEADER_CURRENT_VERSION);
+        logfileHeader.writerIndex(LOGFILE_HEADER_SIZE);
 
         // Find the largest logId
         long logId = INVALID_LID;
@@ -307,13 +308,13 @@ void addListener(EntryLogListener listener) {
      * @param pos The starting position from where we want to read.
      * @return
      */
-    private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuffer buff, long pos)
+    private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos)
             throws IOException {
         BufferedLogChannel bc = logChannel;
         if (null != bc) {
             if (entryLogId == bc.getLogId()) {
                 synchronized (bc) {
-                    if (pos + buff.remaining() >= bc.getFileChannelPosition()) {
+                    if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) {
                         return bc.read(buff, pos);
                     }
                 }
@@ -513,9 +514,9 @@ private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOExcep
         int numberOfLedgers = (int) ledgersMap.size();
 
         // Write the ledgers map into several batches
-        final AtomicLong currentOffset = new AtomicLong(ledgerMapOffset);
+
         final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-        final ByteBuffer serializedMap = ByteBuffer.allocate(maxMapSize);
+        final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize);
 
         try {
             ledgersMap.forEach(new BiConsumerLong() {
@@ -530,25 +531,23 @@ public void accept(long ledgerId, long size) {
                         int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize;
 
                         serializedMap.clear();
-                        serializedMap.putInt(ledgerMapSize - 4);
-                        serializedMap.putLong(INVALID_LID);
-                        serializedMap.putLong(LEDGERS_MAP_ENTRY_ID);
-                        serializedMap.putInt(batchSize);
+                        serializedMap.writeInt(ledgerMapSize - 4);
+                        serializedMap.writeLong(INVALID_LID);
+                        serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
+                        serializedMap.writeInt(batchSize);
 
                         startNewBatch = false;
                         remainingInBatch = batchSize;
                     }
                     // Dump the ledger in the current batch
-                    serializedMap.putLong(ledgerId);
-                    serializedMap.putLong(size);
+                    serializedMap.writeLong(ledgerId);
+                    serializedMap.writeLong(size);
                     --remainingLedgers;
 
                     if (--remainingInBatch == 0) {
                         // Close current batch
-                        serializedMap.flip();
                         try {
-                            int written = entryLogChannel.fileChannel.write(serializedMap, currentOffset.get());
-                            currentOffset.addAndGet(written);
+                            entryLogChannel.write(serializedMap);
                         } catch (IOException e) {
                             throw new RuntimeException(e);
                         }
@@ -563,6 +562,8 @@ public void accept(long ledgerId, long size) {
             } else {
                 throw e;
             }
+        } finally {
+            serializedMap.release();
         }
 
         // Update the headers with the map offset and count of ledgers
@@ -662,7 +663,8 @@ private BufferedLogChannel allocateNewLog(String suffix) throws IOException {
             FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
             BufferedLogChannel logChannel = new BufferedLogChannel(channel,
                     conf.getWriteBufferBytes(), conf.getReadBufferBytes(), preallocatedLogId, newLogFile);
-            logChannel.write((ByteBuffer) logfileHeader.clear());
+            logfileHeader.readerIndex(0);
+            logChannel.write(logfileHeader);
 
             for (File f : list) {
                 setLastLogId(f, preallocatedLogId);
@@ -851,11 +853,22 @@ synchronized void flushCurrentLog() throws IOException {
     }
 
     long addEntry(long ledger, ByteBuffer entry) throws IOException {
+        return addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
+    }
+
+    long addEntry(long ledger, ByteBuf entry) throws IOException {
         return addEntry(ledger, entry, true);
     }
 
-    synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throws IOException {
-        int entrySize = entry.remaining() + 4;
+    private final FastThreadLocal<ByteBuf> sizeBuffer = new FastThreadLocal<ByteBuf>() {
+        @Override
+        protected ByteBuf initialValue() throws Exception {
+            return Unpooled.buffer(4);
+        }
+    };
+
+    synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+        int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
         boolean reachEntryLogLimit =
             rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize);
         // Create new log if logSizeLimit reached or current disk is full
@@ -871,12 +884,11 @@ synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throw
             }
         }
 
-        // Get a buffer from recyclable pool to store the size
-        RecyclableByteBuffer recyclableBuffer = RecyclableByteBuffer.get();
-        recyclableBuffer.buffer.putInt(entry.remaining());
-        recyclableBuffer.buffer.flip();
-        logChannel.write(recyclableBuffer.buffer);
-        recyclableBuffer.recycle();
+        // Get a buffer from thread local to store the size
+        ByteBuf sizeBuffer = this.sizeBuffer.get();
+        sizeBuffer.clear();
+        sizeBuffer.writeInt(entry.readableBytes());
+        logChannel.write(sizeBuffer);
 
         long pos = logChannel.position();
         logChannel.write(entry);
@@ -887,16 +899,18 @@ synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throw
         return (logChannel.getLogId() << 32L) | pos;
     }
 
-    long addEntryForCompaction(long ledgerId, ByteBuffer entry) throws IOException {
+    long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException {
         synchronized (compactionLogLock) {
-            int entrySize = entry.remaining() + 4;
+            int entrySize = entry.readableBytes() + 4;
             if (compactionLogChannel == null) {
                 createNewCompactionLog();
             }
-            ByteBuffer buff = ByteBuffer.allocate(4);
-            buff.putInt(entry.remaining());
-            buff.flip();
-            compactionLogChannel.write(buff);
+
+            ByteBuf sizeBuffer = this.sizeBuffer.get();
+            sizeBuffer.clear();
+            sizeBuffer.writeInt(entry.readableBytes());
+            compactionLogChannel.write(sizeBuffer);
+
             long pos = compactionLogChannel.position();
             compactionLogChannel.write(entry);
             compactionLogChannel.registerWrittenEntry(ledgerId, entrySize);
@@ -950,32 +964,6 @@ void removeCurCompactionLog() {
     }
 
 
-    private static final class RecyclableByteBuffer {
-        private static final Recycler<RecyclableByteBuffer> RECYCLER = new  Recycler<RecyclableByteBuffer>() {
-            @Override
-            protected RecyclableByteBuffer newObject(Handle<RecyclableByteBuffer> handle) {
-                return new RecyclableByteBuffer(handle);
-            }
-        };
-
-        private final ByteBuffer buffer;
-        private final Handle<RecyclableByteBuffer> handle;
-        public RecyclableByteBuffer(Handle<RecyclableByteBuffer> handle) {
-            this.buffer = ByteBuffer.allocate(4);
-            this.handle = handle;
-        }
-
-        public static RecyclableByteBuffer get() {
-            return RECYCLER.get();
-        }
-
-        public void recycle() {
-            buffer.rewind();
-            handle.recycle(this);
-        }
-    }
-
-
     private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws IOException {
         if (!doRegularFlushes) {
             return;
@@ -1001,7 +989,8 @@ synchronized boolean readEntryLogHardLimit(long size) {
     ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
         long entryLogId = logIdForOffset(location);
         long pos = location & 0xffffffffL;
-        RecyclableByteBuffer sizeBuff = RecyclableByteBuffer.get();
+        ByteBuf sizeBuff = sizeBuffer.get();
+        sizeBuff.clear();
         pos -= 4; // we want to get the ledgerId and length to check
         BufferedReadChannel fc;
         try {
@@ -1013,14 +1002,12 @@ ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException
             throw newe;
         }
 
-        if (readFromLogChannel(entryLogId, fc, sizeBuff.buffer, pos) != sizeBuff.buffer.capacity()) {
+        if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) != sizeBuff.capacity()) {
             throw new Bookie.NoEntryException("Short read from entrylog " + entryLogId,
                                               ledgerId, entryId);
         }
         pos += 4;
-        sizeBuff.buffer.flip();
-        int entrySize = sizeBuff.buffer.getInt();
-        sizeBuff.recycle();
+        int entrySize = sizeBuff.readInt();
 
         // entrySize does not include the ledgerId
         if (entrySize > maxSaneEntrySize) {
@@ -1033,7 +1020,7 @@ ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException
         }
 
         ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(entrySize, entrySize);
-        int rc = readFromLogChannel(entryLogId, fc, data.nioBuffer(0, entrySize), pos);
+        int rc = readFromLogChannel(entryLogId, fc, data, pos);
         if (rc != entrySize) {
             // Note that throwing NoEntryException here instead of IOException is not
             // without risk. If all bookies in a quorum throw this same exception
@@ -1042,6 +1029,7 @@ ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException
             // could have occurred, where the length of the entry was corrupted on all
             // replicas. However, the chance of this happening is very very low, so
             // returning NoEntryException is mostly safe.
+            data.release();
             throw new Bookie.NoEntryException("Short read for " + ledgerId + "@"
                                               + entryId + " in " + entryLogId + "@"
                                               + pos + "(" + rc + "!=" + entrySize + ")", ledgerId, entryId);
@@ -1049,11 +1037,13 @@ ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException
         data.writerIndex(entrySize);
         long thisLedgerId = data.getLong(0);
         if (thisLedgerId != ledgerId) {
+            data.release();
             throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos
                     + " entry belongs to " + thisLedgerId + " not " + ledgerId);
         }
         long thisEntryId = data.getLong(8);
         if (thisEntryId != entryId) {
+            data.release();
             throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos
                     + " entry is " + thisEntryId + " not " + entryId);
         }
@@ -1068,21 +1058,24 @@ private Header getHeaderForLogId(long entryLogId) throws IOException {
         BufferedReadChannel bc = getChannelForLogId(entryLogId);
 
         // Allocate buffer to read (version, ledgersMapOffset, ledgerCount)
-        ByteBuffer headers = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
-        bc.read(headers, 0);
-        headers.flip();
+        ByteBuf headers = PooledByteBufAllocator.DEFAULT.directBuffer(LOGFILE_HEADER_SIZE);
+        try {
+            bc.read(headers, 0);
 
-        // Skip marker string "BKLO"
-        headers.getInt();
+            // Skip marker string "BKLO"
+            headers.readInt();
 
-        int headerVersion = headers.getInt();
-        if (headerVersion < HEADER_V0 || headerVersion > HEADER_CURRENT_VERSION) {
-            LOG.info("Unknown entry log header version for log {}: {}", entryLogId, headerVersion);
-        }
+            int headerVersion = headers.readInt();
+            if (headerVersion < HEADER_V0 || headerVersion > HEADER_CURRENT_VERSION) {
+                LOG.info("Unknown entry log header version for log {}: {}", entryLogId, headerVersion);
+            }
 
-        long ledgersMapOffset = headers.getLong();
-        int ledgersCount = headers.getInt();
-        return new Header(headerVersion, ledgersMapOffset, ledgersCount);
+            long ledgersMapOffset = headers.readLong();
+            int ledgersCount = headers.readInt();
+            return new Header(headerVersion, ledgersMapOffset, ledgersCount);
+        } finally {
+            headers.release();
+        }
     }
 
     private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
@@ -1137,8 +1130,8 @@ private File findFile(long logId) throws FileNotFoundException {
      * @throws IOException
      */
     protected void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
-        ByteBuffer sizeBuff = ByteBuffer.allocate(4);
-        ByteBuffer lidBuff = ByteBuffer.allocate(8);
+        // Buffer where to read the entrySize (4 bytes) and the ledgerId (8 bytes)
+        ByteBuf headerBuffer = Unpooled.buffer(4 + 8);
         BufferedReadChannel bc;
         // Get the BufferedChannel for the current entry log file
         try {
@@ -1151,49 +1144,50 @@ protected void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOE
         // the header where all of the ledger entries are.
         long pos = LOGFILE_HEADER_SIZE;
 
-        // Read through the entry log file and extract the ledger ID's.
-        while (true) {
-            // Check if we've finished reading the entry log file.
-            if (pos >= bc.size()) {
-                break;
-            }
-            if (readFromLogChannel(entryLogId, bc, sizeBuff, pos) != sizeBuff.capacity()) {
-                LOG.warn("Short read for entry size from entrylog {}", entryLogId);
-                return;
-            }
-            long offset = pos;
-            pos += 4;
-            sizeBuff.flip();
-            int entrySize = sizeBuff.getInt();
-
-            sizeBuff.clear();
-            // try to read ledger id first
-            if (readFromLogChannel(entryLogId, bc, lidBuff, pos) != lidBuff.capacity()) {
-                LOG.warn("Short read for ledger id from entrylog {}", entryLogId);
-                return;
-            }
-            lidBuff.flip();
-            long lid = lidBuff.getLong();
-            lidBuff.clear();
-            if (lid == INVALID_LID || !scanner.accept(lid)) {
-                // skip this entry
+        // Start with a reasonably sized buffer size
+        ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024);
+
+        try {
+
+            // Read through the entry log file and extract the ledger ID's.
+            while (true) {
+                // Check if we've finished reading the entry log file.
+                if (pos >= bc.size()) {
+                    break;
+                }
+                if (readFromLogChannel(entryLogId, bc, headerBuffer, pos) != headerBuffer.capacity()) {
+                    LOG.warn("Short read for entry size from entrylog {}", entryLogId);
+                    return;
+                }
+                long offset = pos;
+                pos += 4;
+                int entrySize = headerBuffer.readInt();
+                long ledgerId = headerBuffer.readLong();
+                headerBuffer.clear();
+
+                if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) {
+                    // skip this entry
+                    pos += entrySize;
+                    continue;
+                }
+                // read the entry
+
+                data.clear();
+                data.capacity(entrySize);
+                int rc = readFromLogChannel(entryLogId, bc, data, pos);
+                if (rc != entrySize) {
+                    LOG.warn("Short read for ledger entry from entryLog {}@{} ({} != {})",
+                            new Object[] { entryLogId, pos, rc, entrySize });
+                    return;
+                }
+                // process the entry
+                scanner.process(ledgerId, offset, data);
+
+                // Advance position to the next entry
                 pos += entrySize;
-                continue;
-            }
-            // read the entry
-            byte data[] = new byte[entrySize];
-            ByteBuffer buff = ByteBuffer.wrap(data);
-            int rc = readFromLogChannel(entryLogId, bc, buff, pos);
-            if (rc != data.length) {
-                LOG.warn("Short read for ledger entry from entryLog {}@{} ({} != {})", new Object[] { entryLogId, pos,
-                        rc, data.length });
-                return;
             }
-            buff.flip();
-            // process the entry
-            scanner.process(lid, offset, buff);
-            // Advance position to the next entry
-            pos += entrySize;
+        } finally {
+            data.release();
         }
     }
 
@@ -1232,53 +1226,55 @@ EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOExce
         long offset = header.ledgersMapOffset;
         EntryLogMetadata meta = new EntryLogMetadata(entryLogId);
 
-        while (offset < bc.size()) {
-            // Read ledgers map size
-            ByteBuffer sizeBuf = ByteBuffer.allocate(4);
-            bc.read(sizeBuf, offset);
-            sizeBuf.flip();
+        final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+        ByteBuf ledgersMap = ByteBufAllocator.DEFAULT.directBuffer(maxMapSize);
 
-            int ledgersMapSize = sizeBuf.getInt();
+        try {
+            while (offset < bc.size()) {
+                // Read ledgers map size
+                sizeBuffer.get().clear();
+                bc.read(sizeBuffer.get(), offset);
 
-            // Read the index into a buffer
-            ByteBuffer ledgersMapBuffer = ByteBuffer.allocate(ledgersMapSize);
-            bc.read(ledgersMapBuffer, offset + 4);
-            ledgersMapBuffer.flip();
+                int ledgersMapSize = sizeBuffer.get().readInt();
 
-            // Discard ledgerId and entryId
-            long lid = ledgersMapBuffer.getLong();
-            if (lid != INVALID_LID) {
-                throw new IOException("Cannot deserialize ledgers map from ledger " + lid + " -- entryLogId: "
-                        + entryLogId);
-            }
+                // Read the index into a buffer
+                ledgersMap.clear();
+                bc.read(ledgersMap, offset + 4, ledgersMapSize);
 
-            long entryId = ledgersMapBuffer.getLong();
-            if (entryId != LEDGERS_MAP_ENTRY_ID) {
-                throw new IOException("Cannot deserialize ledgers map from ledger " + lid + ":" + entryId
-                        + " -- entryLogId: " + entryLogId);
-            }
+                // Discard ledgerId and entryId
+                long lid = ledgersMap.readLong();
+                if (lid != INVALID_LID) {
+                    throw new IOException("Cannot deserialize ledgers map from ledger " + lid);
+                }
+
+                long entryId = ledgersMap.readLong();
+                if (entryId != LEDGERS_MAP_ENTRY_ID) {
+                    throw new IOException("Cannot deserialize ledgers map from entryId " + entryId);
+                }
 
-            // Read the number of ledgers in the current entry batch
-            int ledgersCount = ledgersMapBuffer.getInt();
+                // Read the number of ledgers in the current entry batch
+                int ledgersCount = ledgersMap.readInt();
 
-            // Extract all (ledger,size) tuples from buffer
-            for (int i = 0; i < ledgersCount; i++) {
-                long ledgerId = ledgersMapBuffer.getLong();
-                long size = ledgersMapBuffer.getLong();
+                // Extract all (ledger,size) tuples from buffer
+                for (int i = 0; i < ledgersCount; i++) {
+                    long ledgerId = ledgersMap.readLong();
+                    long size = ledgersMap.readLong();
 
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Recovering ledgers maps for log {} -- Found ledger: {} with size: {}",
-                            new Object[] { entryLogId, ledgerId, size });
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Recovering ledgers maps for log {} -- Found ledger: {} with size: {}",
+                                new Object[] { entryLogId, ledgerId, size });
+                    }
+                    meta.addLedgerSize(ledgerId, size);
+                }
+                if (ledgersMap.isReadable()) {
+                    throw new IOException("Invalid entry size when reading ledgers map");
                 }
-                meta.addLedgerSize(ledgerId, size);
-            }
 
-            if (ledgersMapBuffer.hasRemaining()) {
-                throw new IOException("Invalid entry size when reading ledgers map on entryLogId: " + entryLogId);
+                // Move to next entry, if any
+                offset += ledgersMapSize + 4;
             }
-
-            // Move to next entry, if any
-            offset += ledgersMapSize + 4;
+        } finally {
+            ledgersMap.release();
         }
 
         if (meta.getLedgersMap().size() != header.ledgersCount) {
@@ -1295,9 +1291,9 @@ private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId) thro
         // Read through the entry log file and extract the entry log meta
         scanEntryLog(entryLogId, new EntryLogScanner() {
             @Override
-            public void process(long ledgerId, long offset, ByteBuffer entry) throws IOException {
+            public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
                 // add new entry size of a ledger to entry log meta
-                meta.addLedgerSize(ledgerId, entry.limit() + 4);
+                meta.addLedgerSize(ledgerId, entry.readableBytes() + 4);
             }
 
             @Override
@@ -1351,6 +1347,7 @@ private static void closeFileChannel(BufferedChannelBase channel) throws IOExcep
         if (null == channel) {
             return;
         }
+
         FileChannel fileChannel = channel.getFileChannel();
         if (null != fileChannel) {
             fileChannel.close();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index c8f94833f..50e93a3dc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -248,7 +248,7 @@ private long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint)
                         ledger = kv.getLedgerId();
                         if (ledgerGC != ledger) {
                             try {
-                                flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer().nioBuffer());
+                                flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer());
                             } catch (NoLedgerException exception) {
                                 ledgerGC = ledger;
                             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 6db3e6d90..2b357eee4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -26,16 +26,18 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
 
 import com.google.common.collect.Lists;
+
 import io.netty.buffer.ByteBuf;
+
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Observable;
 import java.util.Observer;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -263,7 +265,7 @@ public synchronized long addEntry(ByteBuf entry) throws IOException {
         long entryId = entry.getLong(entry.readerIndex() + 8);
         long lac = entry.getLong(entry.readerIndex() + 16);
 
-        processEntry(ledgerId, entryId, entry.nioBuffer());
+        processEntry(ledgerId, entryId, entry);
 
         ledgerCache.updateLastAddConfirmed(ledgerId, lac);
         return entryId;
@@ -409,11 +411,11 @@ public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
         ledgerDeletionListeners.add(listener);
     }
 
-    protected void processEntry(long ledgerId, long entryId, ByteBuffer entry) throws IOException {
+    protected void processEntry(long ledgerId, long entryId, ByteBuf entry) throws IOException {
         processEntry(ledgerId, entryId, entry, true);
     }
 
-    protected synchronized void processEntry(long ledgerId, long entryId, ByteBuffer entry, boolean rollLog)
+    protected synchronized void processEntry(long ledgerId, long entryId, ByteBuf entry, boolean rollLog)
             throws IOException {
         /*
          * Touch dirty flag
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index d3c2144ba..faac1b443 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import com.google.common.base.Stopwatch;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
@@ -39,6 +40,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -49,7 +51,6 @@
 import org.apache.bookkeeper.util.DaemonThreadFactory;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ZeroBuffer;
 import org.apache.bookkeeper.util.collections.GrowableArrayBlockingQueue;
 import org.apache.bookkeeper.util.collections.RecyclableArrayList;
 import org.slf4j.Logger;
@@ -291,7 +292,7 @@ public boolean accept(long journalId) {
         static QueueEntry create(ByteBuf entry, long ledgerId, long entryId, WriteCallback cb, Object ctx,
                 long enqueueTime, OpStatsLogger journalAddEntryStats) {
             QueueEntry qe = RECYCLER.get();
-            qe.entry = entry.duplicate();
+            qe.entry = entry;
             qe.cb = cb;
             qe.ctx = ctx;
             qe.ledgerId = ledgerId;
@@ -508,7 +509,7 @@ void shutdown() throws InterruptedException {
 
     static final int PADDING_MASK = -0x100;
 
-    static void writePaddingBytes(JournalChannel jc, ByteBuffer paddingBuffer, int journalAlignSize)
+    static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int journalAlignSize)
             throws IOException {
         int bytesToAlign = (int) (jc.bc.position() % journalAlignSize);
         if (0 != bytesToAlign) {
@@ -520,14 +521,13 @@ static void writePaddingBytes(JournalChannel jc, ByteBuffer paddingBuffer, int j
             }
             paddingBuffer.clear();
             // padding mask
-            paddingBuffer.putInt(PADDING_MASK);
+            paddingBuffer.writeInt(PADDING_MASK);
             // padding len
-            paddingBuffer.putInt(paddingBytes);
+            paddingBuffer.writeInt(paddingBytes);
             // padding bytes
-            paddingBuffer.position(8 + paddingBytes);
+            paddingBuffer.writerIndex(paddingBuffer.writerIndex() + paddingBytes);
 
-            paddingBuffer.flip();
-            jc.preAllocIfNeeded(paddingBuffer.limit());
+            jc.preAllocIfNeeded(paddingBuffer.readableBytes());
             // write padding bytes
             jc.bc.write(paddingBuffer);
         }
@@ -860,9 +860,9 @@ public void run() {
         LOG.info("Starting journal on {}", journalDirectory);
 
         RecyclableArrayList<QueueEntry> toFlush = entryListRecycler.newInstance();
-        ByteBuffer lenBuff = ByteBuffer.allocate(4);
-        ByteBuffer paddingBuff = ByteBuffer.allocate(2 * conf.getJournalAlignmentSize());
-        ZeroBuffer.put(paddingBuff);
+        ByteBuf lenBuff = Unpooled.buffer(4);
+        ByteBuf paddingBuff = Unpooled.buffer(2 * conf.getJournalAlignmentSize());
+        paddingBuff.writeZero(paddingBuff.capacity());
         final int journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite();
         final int journalAlignmentSize = conf.getJournalAlignmentSize();
         JournalChannel logFile = null;
@@ -1020,24 +1020,20 @@ public void run() {
                     continue;
                 }
 
-                journalWriteBytes.add(qe.entry.readableBytes());
+                int entrySize = qe.entry.readableBytes();
+                journalWriteBytes.add(entrySize);
                 journalQueueSize.dec();
 
-                batchSize += (4 + qe.entry.readableBytes());
+                batchSize += (4 + entrySize);
 
                 lenBuff.clear();
-                lenBuff.putInt(qe.entry.readableBytes());
-                lenBuff.flip();
+                lenBuff.writeInt(entrySize);
 
                 // preAlloc based on size
-                logFile.preAllocIfNeeded(4 + qe.entry.readableBytes());
+                logFile.preAllocIfNeeded(4 + entrySize);
 
-                //
-                // we should be doing the following, but then we run out of
-                // direct byte buffers
-                // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
                 bc.write(lenBuff);
-                bc.write(qe.entry.nioBuffer());
+                bc.write(qe.entry);
                 qe.entry.release();
 
                 toFlush.add(qe);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
index 96fbbe1fd..abfb264aa 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SkipListFlusher.java
@@ -21,8 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 /**
  * Flush entries from skip list.
@@ -36,5 +37,5 @@
      * @param entry Entry ByteBuffer
      * @throws IOException
      */
-    void process(long ledgerId, long entryId, ByteBuffer entry) throws IOException;
+    void process(long ledgerId, long entryId, ByteBuf entry) throws IOException;
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 972f0852b..b0c6bad08 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -24,7 +24,6 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -177,7 +176,7 @@ public void checkpoint(final Checkpoint checkpoint) throws IOException {
 
     @Override
     public void process(long ledgerId, long entryId,
-                        ByteBuffer buffer) throws IOException {
+                        ByteBuf buffer) throws IOException {
         processEntry(ledgerId, entryId, buffer, false);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
index e5fef35d2..67c11062a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
@@ -21,9 +21,10 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -172,18 +173,17 @@ public boolean accept(long ledgerId) {
                 }
 
                 @Override
-                public void process(long ledgerId, long offset, ByteBuffer entry) throws IOException {
-                    throttler.acquire(entry.remaining());
+                public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
+                    throttler.acquire(entry.readableBytes());
                     synchronized (TransactionalEntryLogCompactor.this) {
-                        long lid = entry.getLong();
-                        long entryId = entry.getLong();
+                        long lid = entry.getLong(entry.readerIndex());
+                        long entryId = entry.getLong(entry.readerIndex() + 8);
                         if (lid != ledgerId || entryId < -1) {
                             LOG.warn("Scanning expected ledgerId {}, but found invalid entry "
                                     + "with ledgerId {} entryId {} at offset {}",
                                 new Object[]{ledgerId, lid, entryId, offset});
                             throw new IOException("Invalid entry found @ offset " + offset);
                         }
-                        entry.rewind();
                         long newOffset = entryLogger.addEntryForCompaction(ledgerId, entry);
                         offsets.add(new EntryLocation(ledgerId, entryId, newOffset));
 
@@ -356,16 +356,15 @@ public boolean accept(long ledgerId) {
                 }
 
                 @Override
-                public void process(long ledgerId, long offset, ByteBuffer entry) throws IOException {
-                    long lid = entry.getLong();
-                    long entryId = entry.getLong();
+                public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
+                    long lid = entry.getLong(entry.readerIndex());
+                    long entryId = entry.getLong(entry.readerIndex() + 8);
                     if (lid != ledgerId || entryId < -1) {
                         LOG.warn("Scanning expected ledgerId {}, but found invalid entry "
                                 + "with ledgerId {} entryId {} at offset {}",
                             new Object[]{ledgerId, lid, entryId, offset});
                         throw new IOException("Invalid entry found @ offset " + offset);
                     }
-                    entry.rewind();
                     long location = (compactedLogId << 32L) | (offset + 4);
                     offsets.add(new EntryLocation(lid, entryId, location));
                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
index d3ae7467f..e55dca28f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
@@ -120,11 +120,6 @@ public int capacity() {
         return b1.capacity() + b2.capacity();
     }
 
-    @Override
-    public int readableBytes() {
-        return b1.readableBytes() + b2.readableBytes();
-    }
-
     @Override
     public int writableBytes() {
         return 0;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 8332b341f..6f75bbe44 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -20,6 +20,11 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
@@ -29,8 +34,8 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.bookkeeper.client.ClientUtil;
@@ -38,14 +43,11 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.ZeroBuffer;
 import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.junit.Test;
-import org.junit.After;
-
-import static org.junit.Assert.*;
 
 public class BookieJournalTest {
     private final static Logger LOG = LoggerFactory.getLogger(BookieJournalTest.class);
@@ -105,11 +107,10 @@ private void writePartialIndexFileForLedger(File indexDir, long ledgerId,
     /**
      * Generate fence entry
      */
-    private ByteBuffer generateFenceEntry(long ledgerId) {
-        ByteBuffer bb = ByteBuffer.allocate(8 + 8);
-        bb.putLong(ledgerId);
-        bb.putLong(Bookie.METAENTRY_ID_FENCE_KEY);
-        bb.flip();
+    private ByteBuf generateFenceEntry(long ledgerId) {
+        ByteBuf bb = Unpooled.buffer();
+        bb.writeLong(ledgerId);
+        bb.writeLong(Bookie.METAENTRY_ID_FENCE_KEY);
         return bb;
     }
 
@@ -117,13 +118,12 @@ private ByteBuffer generateFenceEntry(long ledgerId) {
      * Generate meta entry with given master key
      */
     private ByteBuf generateMetaEntry(long ledgerId, byte[] masterKey) {
-        ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length);
-        bb.putLong(ledgerId);
-        bb.putLong(Bookie.METAENTRY_ID_LEDGER_KEY);
-        bb.putInt(masterKey.length);
-        bb.put(masterKey);
-        bb.flip();
-        return Unpooled.wrappedBuffer(bb);
+        ByteBuf bb = Unpooled.buffer();
+        bb.writeLong(ledgerId);
+        bb.writeLong(Bookie.METAENTRY_ID_LEDGER_KEY);
+        bb.writeInt(masterKey.length);
+        bb.writeBytes(masterKey);
+        return bb;
     }
 
     private void writeJunkJournal(File journalDir) throws Exception {
@@ -204,8 +204,8 @@ private JournalChannel writeV2Journal(File journalDir, int numEntries) throws Ex
             lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
 
-            bc.write(lenBuff);
-            bc.write(packet.nioBuffer());
+            bc.write(Unpooled.wrappedBuffer(lenBuff));
+            bc.write(packet);
             packet.release();
         }
         bc.flush(true);
@@ -238,8 +238,8 @@ private JournalChannel writeV3Journal(File journalDir, int numEntries, byte[] ma
             lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
 
-            bc.write(lenBuff);
-            bc.write(packet.nioBuffer());
+            bc.write(Unpooled.wrappedBuffer(lenBuff));
+            bc.write(packet);
             packet.release();
         }
         bc.flush(true);
@@ -271,15 +271,14 @@ private JournalChannel writeV4Journal(File journalDir, int numEntries, byte[] ma
             ByteBuffer lenBuff = ByteBuffer.allocate(4);
             lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
-            bc.write(lenBuff);
-            bc.write(packet.nioBuffer());
+            bc.write(Unpooled.wrappedBuffer(lenBuff));
+            bc.write(packet);
             packet.release();
         }
         // write fence key
-        ByteBuffer packet = generateFenceEntry(1);
-        ByteBuffer lenBuf = ByteBuffer.allocate(4);
-        lenBuf.putInt(packet.remaining());
-        lenBuf.flip();
+        ByteBuf packet = generateFenceEntry(1);
+        ByteBuf lenBuf = Unpooled.buffer();
+        lenBuf.writeInt(packet.readableBytes());
         bc.write(lenBuf);
         bc.write(packet);
         bc.flush(true);
@@ -293,8 +292,8 @@ private JournalChannel writeV5Journal(File journalDir, int numEntries, byte[] ma
 
         BufferedChannel bc = jc.getBufferedChannel();
 
-        ByteBuffer paddingBuff = ByteBuffer.allocateDirect(2 * JournalChannel.SECTOR_SIZE);
-        ZeroBuffer.put(paddingBuff);
+        ByteBuf paddingBuff = Unpooled.buffer();
+        paddingBuff.writeZero(2 * JournalChannel.SECTOR_SIZE);
         byte[] data = new byte[4 * 1024 * 1024];
         Arrays.fill(data, (byte)'X');
         long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
@@ -308,19 +307,17 @@ private JournalChannel writeV5Journal(File journalDir, int numEntries, byte[] ma
             }
             lastConfirmed = i;
             length += i;
-            ByteBuffer lenBuff = ByteBuffer.allocate(4);
-            lenBuff.putInt(packet.readableBytes());
-            lenBuff.flip();
+            ByteBuf lenBuff = Unpooled.buffer();
+            lenBuff.writeInt(packet.readableBytes());
             bc.write(lenBuff);
-            bc.write(packet.nioBuffer());
+            bc.write(packet);
             packet.release();
             Journal.writePaddingBytes(jc, paddingBuff, JournalChannel.SECTOR_SIZE);
         }
         // write fence key
-        ByteBuffer packet = generateFenceEntry(1);
-        ByteBuffer lenBuf = ByteBuffer.allocate(4);
-        lenBuf.putInt(packet.remaining());
-        lenBuf.flip();
+        ByteBuf packet = generateFenceEntry(1);
+        ByteBuf lenBuf = Unpooled.buffer();
+        lenBuf.writeInt(packet.readableBytes());
         bc.write(lenBuf);
         bc.write(packet);
         Journal.writePaddingBytes(jc, paddingBuff, JournalChannel.SECTOR_SIZE);
@@ -519,7 +516,7 @@ public void testJunkEndedJournal() throws Exception {
         Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
 
         JournalChannel jc = writeV2Journal(Bookie.getCurrentDirectory(journalDir), 0);
-        jc.getBufferedChannel().write(ByteBuffer.wrap("JunkJunkJunk".getBytes()));
+        jc.getBufferedChannel().write(Unpooled.wrappedBuffer("JunkJunkJunk".getBytes()));
         jc.getBufferedChannel().flush(true);
 
         writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 8de2bfc98..144871cd6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -21,12 +21,21 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -35,15 +44,8 @@
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.SnapshotMap;
 import org.apache.bookkeeper.util.IOUtils;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.bookkeeper.util.SnapshotMap;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -52,8 +54,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
-
 /**
  * LedgerCache related test cases
  */
@@ -499,7 +499,7 @@ boolean isSizeLimitReached() {
         }
 
         @Override
-        public void process(long ledgerId, long entryId, ByteBuffer buffer) throws IOException {
+        public void process(long ledgerId, long entryId, ByteBuf buffer) throws IOException {
             if (injectFlushException.get()) {
                 throw new IOException("Injected Exception");
             }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
index ec60799fc..7606c919a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -22,17 +22,20 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.junit.Test;
 import org.junit.Before;
+import org.junit.Test;
 
 public class TestEntryMemTable implements CacheCallback, SkipListFlusher, CheckpointSource {
 
@@ -97,7 +100,7 @@ public void onSizeLimitReached(Checkpoint cp) throws IOException {
         // No-op
     }
 
-    public void process(long ledgerId, long entryId, ByteBuffer entry)
+    public void process(long ledgerId, long entryId, ByteBuf entry)
             throws IOException {
         // No-op
     }
@@ -135,7 +138,7 @@ public void testScanAcrossSnapshot() throws IOException {
         }
 
         @Override
-        public void process(long ledgerId, long entryId, ByteBuffer entry) throws IOException {
+        public void process(long ledgerId, long entryId, ByteBuf entry) throws IOException {
             assertTrue(ledgerId + ":" + entryId + " is duplicate in store!",
                     keyValues.add(new EntryKeyValue(ledgerId, entryId, entry.array())));
         }
@@ -143,7 +146,7 @@ public void process(long ledgerId, long entryId, ByteBuffer entry) throws IOExce
 
     private class NoLedgerFLusher implements SkipListFlusher {
         @Override
-        public void process(long ledgerId, long entryId, ByteBuffer entry) throws IOException {
+        public void process(long ledgerId, long entryId, ByteBuf entry) throws IOException {
             throw new NoLedgerException(ledgerId);
         }
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
index fba9594ee..00e3d15d0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
@@ -24,6 +24,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -96,8 +97,8 @@ static JournalChannel writeJournal(File journalDir, int numEntries, byte[] maste
             lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
 
-            bc.write(lenBuff);
-            bc.write(packet.nioBuffer());
+            bc.write(Unpooled.wrappedBuffer(lenBuff));
+            bc.write(packet);
             packet.release();
         }
         bc.flush(true);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
index 824f72f94..270a60f94 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
@@ -21,6 +21,7 @@
 import static org.junit.Assert.assertEquals;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.nio.ByteBuffer;
@@ -121,4 +122,28 @@ public void testToByteBuffer() {
 
         assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }), b.nioBuffer());
     }
+
+    /**
+     * Verify that readableBytes() returns writerIndex - readerIndex. In this case writerIndex is the end of the buffer
+     * and readerIndex is increased by 64.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testReadableBytes() throws Exception {
+        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b1.writerIndex(b1.capacity());
+        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        b2.writerIndex(b2.capacity());
+        ByteBuf buf = DoubleByteBuf.get(b1, b2);
+
+        assertEquals(buf.readerIndex(), 0);
+        assertEquals(buf.writerIndex(), 256);
+        assertEquals(buf.readableBytes(), 256);
+
+        for (int i = 0; i < 4; ++i) {
+            buf.skipBytes(64);
+            assertEquals(buf.readableBytes(), 256 - 64 * (i + 1));
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services