You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2017/05/15 22:32:01 UTC

bookkeeper git commit: BOOKKEEPER-1048: Use ByteBuf in LedgerStorage interface

Repository: bookkeeper
Updated Branches:
  refs/heads/master 5d43260e8 -> 0f81461d2


BOOKKEEPER-1048: Use ByteBuf in LedgerStorage interface

To pass ref-counted buffer from Netty directly to the storage and the Journal, we need to have LedgerStorage to accept ByteBuf instead of ByteBuffer

#### Note

This commit is on top of BOOKKEEPER-1048 / #138. Once that gets merged, I will rebase. Posting now to get Jenkins run. Please review last commit f53f772f79d0a334edc0f05e66edb7cc645b1ffa in this PR for now.

Author: Matteo Merli <mm...@apache.org>

Reviewers: Jia Zhai <None>, Sijie Guo <None>

Closes #139 from merlimat/bytebuf-in-ledger-storage


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/0f81461d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/0f81461d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/0f81461d

Branch: refs/heads/master
Commit: 0f81461d2d1dc5cf9db4de9a46599d7d64e3dac6
Parents: 5d43260
Author: Matteo Merli <mm...@apache.org>
Authored: Mon May 15 15:31:56 2017 -0700
Committer: Matteo Merli <mm...@apache.org>
Committed: Mon May 15 15:31:56 2017 -0700

----------------------------------------------------------------------
 .../org/apache/bookkeeper/bookie/Bookie.java    | 61 +++++++++++---------
 .../apache/bookkeeper/bookie/EntryKeyValue.java |  7 ++-
 .../apache/bookkeeper/bookie/EntryMemTable.java |  2 +-
 .../org/apache/bookkeeper/bookie/FileInfo.java  | 21 ++++---
 .../bookkeeper/bookie/IndexPersistenceMgr.java  |  6 +-
 .../bookie/InterleavedLedgerStorage.java        | 33 ++++++-----
 .../org/apache/bookkeeper/bookie/Journal.java   | 33 +++++++----
 .../apache/bookkeeper/bookie/LedgerCache.java   |  6 +-
 .../bookkeeper/bookie/LedgerCacheImpl.java      |  6 +-
 .../bookkeeper/bookie/LedgerDescriptor.java     | 15 ++---
 .../bookkeeper/bookie/LedgerDescriptorImpl.java | 17 +++---
 .../bookie/LedgerDescriptorReadOnlyImpl.java    |  5 +-
 .../apache/bookkeeper/bookie/LedgerStorage.java | 12 ++--
 .../bookkeeper/bookie/SortedLedgerStorage.java  | 21 ++++---
 .../bookkeeper/proto/BookieProtoEncoding.java   | 35 +++++------
 .../apache/bookkeeper/proto/BookieProtocol.java |  9 +--
 .../bookkeeper/proto/ReadEntryProcessor.java    |  9 ++-
 .../bookkeeper/proto/ReadEntryProcessorV3.java  | 11 ++--
 .../bookkeeper/proto/ReadLacProcessorV3.java    | 15 +++--
 .../bookkeeper/proto/ResponseBuilder.java       |  8 +--
 .../bookkeeper/proto/WriteEntryProcessor.java   |  6 +-
 .../bookkeeper/proto/WriteEntryProcessorV3.java |  5 +-
 .../bookkeeper/proto/WriteLacProcessorV3.java   |  3 +-
 .../bookkeeper/bookie/BookieJournalTest.java    | 15 ++---
 .../bookkeeper/bookie/CompactionTest.java       | 18 +++---
 .../apache/bookkeeper/bookie/EntryLogTest.java  | 42 +++++++-------
 .../bookkeeper/bookie/LedgerCacheTest.java      | 20 ++++---
 .../bookkeeper/bookie/TestSyncThread.java       | 17 +++---
 .../bookkeeper/client/BookKeeperCloseTest.java  |  9 ++-
 .../bookkeeper/client/LedgerCloseTest.java      |  8 ++-
 .../bookkeeper/client/LedgerRecoveryTest.java   | 10 ++--
 .../apache/bookkeeper/meta/GcLedgersTest.java   | 12 ++--
 .../bookkeeper/meta/LedgerManagerTestCase.java  | 12 ++--
 .../proto/TestPerChannelBookieClient.java       |  2 +-
 .../replication/AuditorPeriodicCheckTest.java   |  4 +-
 .../bookkeeper/test/ConcurrentLedgerTest.java   | 15 +++--
 36 files changed, 292 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index c743ef4..090574c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -22,6 +22,8 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -49,8 +51,10 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirExcepti
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.jmx.BKMBeanRegistry;
+
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNS;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -68,6 +72,7 @@ import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.io.FileUtils;
+
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -75,6 +80,7 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
+
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -484,7 +490,7 @@ public class Bookie extends BookieCriticalThread {
             Versioned<Cookie> zkCookie = null;
             try {
                 zkCookie = Cookie.readFromZooKeeper(zk, conf);
-                // If allowStorageExpansion option is set, we should 
+                // If allowStorageExpansion option is set, we should
                 // make sure that the new set of ledger/index dirs
                 // is a super set of the old; else, we fail the cookie check
                 masterCookie.verifyIsSuperSet(zkCookie.getValue());
@@ -773,7 +779,7 @@ public class Bookie extends BookieCriticalThread {
                         LedgerDescriptor handle = handles.getHandle(ledgerId, key);
 
                         recBuff.rewind();
-                        handle.addEntry(recBuff);
+                        handle.addEntry(Unpooled.wrappedBuffer(recBuff));
                     }
                 } catch (NoLedgerException nsle) {
                     LOG.debug("Skip replaying entries of ledger {} since it was deleted.", ledgerId);
@@ -1346,9 +1352,10 @@ public class Bookie extends BookieCriticalThread {
      *
      * @throws BookieException if masterKey does not match the master key of the ledger
      */
-    private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, final byte[] masterKey)
+    private LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey)
             throws IOException, BookieException {
-        final long ledgerId = entry.getLong();
+        final long ledgerId = entry.getLong(entry.readerIndex());
+
         LedgerDescriptor l = handles.getHandle(ledgerId, masterKey);
         if (masterKeyCache.get(ledgerId) == null) {
             // Force the load into masterKey cache
@@ -1376,16 +1383,16 @@ public class Bookie extends BookieCriticalThread {
     /**
      * Add an entry to a ledger as specified by handle.
      */
-    private void addEntryInternal(LedgerDescriptor handle, ByteBuffer entry, WriteCallback cb, Object ctx)
+    private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, WriteCallback cb, Object ctx)
             throws IOException, BookieException {
         long ledgerId = handle.getLedgerId();
-        entry.rewind();
         long entryId = handle.addEntry(entry);
 
-        entry.rewind();
-        writeBytes.add(entry.remaining());
+        writeBytes.add(entry.readableBytes());
 
-        LOG.trace("Adding {}@{}", entryId, ledgerId);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Adding {}@{}", entryId, ledgerId);
+        }
         getJournal(ledgerId).logAddEntry(entry, cb, ctx);
     }
 
@@ -1395,7 +1402,7 @@ public class Bookie extends BookieCriticalThread {
      * so that they exist on a quorum of bookies. The corresponding client side call for this
      * is not exposed to users.
      */
-    public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+    public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
             throws IOException, BookieException {
         long requestNanos = MathUtils.nowInNano();
         boolean success = false;
@@ -1403,7 +1410,7 @@ public class Bookie extends BookieCriticalThread {
         try {
             LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
             synchronized (handle) {
-                entrySize = entry.remaining();
+                entrySize = entry.readableBytes();
                 addEntryInternal(handle, entry, cb, ctx);
             }
             success = true;
@@ -1419,15 +1426,16 @@ public class Bookie extends BookieCriticalThread {
                 recoveryAddEntryStats.registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
                 addBytesStats.registerFailedValue(entrySize);
             }
+
+            entry.release();
         }
     }
 
-    public void setExplicitLac(ByteBuffer entry, Object ctx, byte[] masterKey)
+    public void setExplicitLac(ByteBuf entry, Object ctx, byte[] masterKey)
             throws IOException, BookieException {
         try {
-            long ledgerId = entry.getLong();
+            long ledgerId = entry.getLong(entry.readerIndex());
             LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
-            entry.rewind();
             synchronized (handle) {
                 handle.setExplicitLac(entry);
             }
@@ -1437,8 +1445,8 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
-    public ByteBuffer getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedgerException {
-        ByteBuffer lac;
+    public ByteBuf getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedgerException {
+        ByteBuf lac;
         LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
         synchronized (handle) {
             lac = handle.getExplicitLac();
@@ -1450,7 +1458,7 @@ public class Bookie extends BookieCriticalThread {
      * Add entry to a ledger.
      * @throws BookieException.LedgerFencedException if the ledger is fenced
      */
-    public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+    public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
             throws IOException, BookieException {
         long requestNanos = MathUtils.nowInNano();
         boolean success = false;
@@ -1462,7 +1470,7 @@ public class Bookie extends BookieCriticalThread {
                     throw BookieException
                             .create(BookieException.Code.LedgerFencedException);
                 }
-                entrySize = entry.remaining();
+                entrySize = entry.readableBytes();
                 addEntryInternal(handle, entry, cb, ctx);
             }
             success = true;
@@ -1478,6 +1486,8 @@ public class Bookie extends BookieCriticalThread {
                 addEntryStats.registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
                 addBytesStats.registerFailedValue(entrySize);
             }
+
+            entry.release();
         }
     }
 
@@ -1511,7 +1521,7 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
-    public ByteBuffer readEntry(long ledgerId, long entryId)
+    public ByteBuf readEntry(long ledgerId, long entryId)
             throws IOException, NoLedgerException {
         long requestNanos = MathUtils.nowInNano();
         boolean success = false;
@@ -1519,9 +1529,8 @@ public class Bookie extends BookieCriticalThread {
         try {
             LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
             LOG.trace("Reading {}@{}", entryId, ledgerId);
-            ByteBuffer entry = handle.readEntry(entryId);
-            entrySize = entry.remaining();
-            readBytes.add(entrySize);
+            ByteBuf entry = handle.readEntry(entryId);
+            readBytes.add(entry.readableBytes());
             success = true;
             return entry;
         } finally {
@@ -1667,11 +1676,9 @@ public class Bookie extends BookieCriticalThread {
         CounterCallback cb = new CounterCallback();
         long start = MathUtils.now();
         for (int i = 0; i < 100000; i++) {
-            ByteBuffer buff = ByteBuffer.allocate(1024);
-            buff.putLong(1);
-            buff.putLong(i);
-            buff.limit(1024);
-            buff.position(0);
+            ByteBuf buff = Unpooled.buffer(1024);
+            buff.writeLong(1);
+            buff.writeLong(i);
             cb.incCount();
             b.addEntry(buff, cb, null, new byte[0]);
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
index dab5396..42a1f34 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
@@ -19,6 +19,9 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.nio.ByteBuffer;
 
 /**
@@ -82,8 +85,8 @@ public class EntryKeyValue extends EntryKey {
     *
     * @return the value
     */
-    public ByteBuffer getValueAsByteBuffer() {
-        return ByteBuffer.wrap(getBuffer(), getOffset(), getLength());
+    public ByteBuf getValueAsByteBuffer() {
+        return Unpooled.wrappedBuffer(getBuffer(), getOffset(), getLength());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index ff14d03..214b286 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -251,7 +251,7 @@ public class EntryMemTable {
                         ledger = kv.getLedgerId();
                         if (ledgerGC != ledger) {
                             try {
-                                flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer());
+                                flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer().nioBuffer());
                             } catch (NoLedgerException exception) {
                                 ledgerGC = ledger;
                             }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index adf148c..90f731a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -36,6 +36,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 /**
  * This is the file handle for a ledger's index file that maps entry ids to location.
  * It is used by LedgerCache.
@@ -114,31 +117,31 @@ class FileInfo {
         return sizeSinceLastwrite;
     }
 
-    public ByteBuffer getExplicitLac() {
-        ByteBuffer retLac = null;
+    public ByteBuf getExplicitLac() {
+        ByteBuf retLac = null;
         synchronized(this) {
             LOG.debug("fileInfo:GetLac: {}", explicitLac);
             if (explicitLac != null) {
-                retLac = ByteBuffer.allocate(explicitLac.capacity());
+                retLac = Unpooled.buffer(explicitLac.capacity());
                 explicitLac.rewind();//copy from the beginning
-                retLac.put(explicitLac);
+                retLac.writeBytes(explicitLac);
                 explicitLac.rewind();
-                retLac.flip();
+                return retLac;
             }
         }
         return retLac;
     }
 
-    public void setExplicitLac(ByteBuffer lac) {
+    public void setExplicitLac(ByteBuf lac) {
         synchronized(this) {
             if (explicitLac == null) {
                 explicitLac = ByteBuffer.allocate(lac.capacity());
             }
-            explicitLac.put(lac);
+            lac.readBytes(explicitLac);
             explicitLac.rewind();
-            
+
             // skip the ledger id
-            explicitLac.getLong();            
+            explicitLac.getLong();
             long explicitLacValue = explicitLac.getLong();
             setLastAddConfirmed(explicitLacValue);
             explicitLac.rewind();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 1ea000c..708bdb3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -43,6 +43,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import io.netty.buffer.ByteBuf;
+
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_NUM_EVICTED_LEDGERS;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OPEN_LEDGERS;
 
@@ -389,7 +391,7 @@ public class IndexPersistenceMgr {
         }
     }
 
-    void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException {
+    void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
         FileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
@@ -402,7 +404,7 @@ public class IndexPersistenceMgr {
         }
     }
 
-    public ByteBuffer getExplicitLac(long ledgerId) {
+    public ByteBuf getExplicitLac(long ledgerId) {
         FileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index afd65dc..61a81c4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -21,6 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -219,11 +222,13 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         return ledgerCache.isFenced(ledgerId);
     }
 
-    public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException {
+    @Override
+    public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
         ledgerCache.setExplicitLac(ledgerId, lac);
     }
 
-    public ByteBuffer getExplicitLac(long ledgerId) {
+    @Override
+    public ByteBuf getExplicitLac(long ledgerId) {
         return ledgerCache.getExplicitLac(ledgerId);
     }
 
@@ -246,13 +251,13 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
     public long getLastAddConfirmed(long ledgerId) throws IOException {
         Long lac = ledgerCache.getLastAddConfirmed(ledgerId);
         if (lac == null) {
-            ByteBuffer bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+            ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
             if (null == bb) {
                 return BookieProtocol.INVALID_ENTRY_ID;
             } else {
-                bb.getLong(); // ledger id
-                bb.getLong(); // entry id
-                lac = bb.getLong();
+                bb.readLong(); // ledger id
+                bb.readLong(); // entry id
+                lac = bb.readLong();
                 lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac);
             }
         }
@@ -260,21 +265,19 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
     }
 
     @Override
-    synchronized public long addEntry(ByteBuffer entry) throws IOException {
-        long ledgerId = entry.getLong();
-        long entryId = entry.getLong();
-        long lac = entry.getLong();
-        entry.rewind();
+    synchronized public long addEntry(ByteBuf entry) throws IOException {
+        long ledgerId = entry.getLong(entry.readerIndex() + 0);
+        long entryId = entry.getLong(entry.readerIndex() + 8);
+        long lac = entry.getLong(entry.readerIndex() + 16);
 
-        processEntry(ledgerId, entryId, entry);
+        processEntry(ledgerId, entryId, entry.nioBuffer());
 
         ledgerCache.updateLastAddConfirmed(ledgerId, lac);
-
         return entryId;
     }
 
     @Override
-    public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+    public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
         long offset;
         /*
          * If entryId is BookieProtocol.LAST_ADD_CONFIRMED, then return the last written.
@@ -305,7 +308,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         try {
             byte[] retBytes = entryLogger.readEntry(ledgerId, entryId, offset);
             success = true;
-            return ByteBuffer.wrap(retBytes);
+            return Unpooled.wrappedBuffer(retBytes);
         } finally {
             if (success) {
                 getEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index a39c3fa..c679ee9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -21,6 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -36,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Stopwatch;
+
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -272,14 +276,14 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
      * Journal Entry to Record
      */
     private class QueueEntry implements Runnable {
-        ByteBuffer entry;
+        ByteBuf entry;
         long ledgerId;
         long entryId;
         WriteCallback cb;
         Object ctx;
         long enqueueTime;
 
-        QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
+        QueueEntry(ByteBuf entry, long ledgerId, long entryId,
                    WriteCallback cb, Object ctx, long enqueueTime) {
             this.entry = entry.duplicate();
             this.cb = cb;
@@ -750,14 +754,20 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
         }
     }
 
+    public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
+        logAddEntry(Unpooled.wrappedBuffer(entry), cb, ctx);
+    }
+
     /**
      * record an add entry operation in journal
      */
-    public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
-        long ledgerId = entry.getLong();
-        long entryId = entry.getLong();
-        entry.rewind();
+    public void logAddEntry(ByteBuf entry, WriteCallback cb, Object ctx) {
+        long ledgerId = entry.getLong(entry.readerIndex() + 0);
+        long entryId = entry.getLong(entry.readerIndex() + 8);
         journalQueueSize.inc();
+
+        //Retain entry until it gets written to journal
+        entry.retain();
         queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano()));
     }
 
@@ -927,24 +937,25 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
                     continue;
                 }
 
-                journalWriteBytes.add(qe.entry.remaining());
+                journalWriteBytes.add(qe.entry.readableBytes());
                 journalQueueSize.dec();
 
-                batchSize += (4 + qe.entry.remaining());
+                batchSize += (4 + qe.entry.readableBytes());
 
                 lenBuff.clear();
-                lenBuff.putInt(qe.entry.remaining());
+                lenBuff.putInt(qe.entry.readableBytes());
                 lenBuff.flip();
 
                 // preAlloc based on size
-                logFile.preAllocIfNeeded(4 + qe.entry.remaining());
+                logFile.preAllocIfNeeded(4 + qe.entry.readableBytes());
 
                 //
                 // we should be doing the following, but then we run out of
                 // direct byte buffers
                 // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
                 bc.write(lenBuff);
-                bc.write(qe.entry);
+                bc.write(qe.entry.nioBuffer());
+                qe.entry.release();
 
                 toFlush.add(qe);
                 qe = null;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index e004cb6..efb67dc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -25,6 +25,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import io.netty.buffer.ByteBuf;
+
 /**
  * This class maps a ledger entry number into a location (entrylogid, offset) in
  * an entry log file. It does user level caching to more efficiently manage disk
@@ -51,6 +53,6 @@ interface LedgerCache extends Closeable {
     void deleteLedger(long ledgerId) throws IOException;
 
     LedgerCacheBean getJMXBean();
-    void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException;
-    ByteBuffer getExplicitLac(long ledgerId);
+    void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException;
+    ByteBuf getExplicitLac(long ledgerId);
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index cece79f..515b64b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -31,6 +31,8 @@ import org.apache.bookkeeper.util.SnapshotMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+
 /**
  * Implementation of LedgerCache interface.
  * This class serves two purposes.
@@ -137,11 +139,11 @@ public class LedgerCacheImpl implements LedgerCache {
         return indexPersistenceManager.isFenced(ledgerId);
     }
 
-    public void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException {
+    public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
         indexPersistenceManager.setExplicitLac(ledgerId, lac);
     }
 
-    public ByteBuffer getExplicitLac(long ledgerId) {
+    public ByteBuf getExplicitLac(long ledgerId) {
         return indexPersistenceManager.getExplicitLac(ledgerId);
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index bcb0c30..9fe1629 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -21,12 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import io.netty.buffer.ByteBuf;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
 
 /**
  * Implements a ledger inside a bookie. In particular, it implements operations
@@ -57,12 +54,12 @@ public abstract class LedgerDescriptor {
     abstract boolean setFenced() throws IOException;
     abstract boolean isFenced() throws IOException;
 
-    abstract long addEntry(ByteBuffer entry) throws IOException;
-    abstract ByteBuffer readEntry(long entryId) throws IOException;
+    abstract long addEntry(ByteBuf entry) throws IOException;
+    abstract ByteBuf readEntry(long entryId) throws IOException;
 
     abstract long getLastAddConfirmed() throws IOException;
 
-    abstract void setExplicitLac(ByteBuffer entry) throws IOException;
+    abstract void setExplicitLac(ByteBuf entry) throws IOException;
 
-    abstract  ByteBuffer getExplicitLac();
+    abstract  ByteBuf getExplicitLac();
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index bf1c129..6602392 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -21,8 +21,10 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 import org.slf4j.Logger;
@@ -69,28 +71,27 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
     }
 
     @Override
-    void setExplicitLac(ByteBuffer lac) throws IOException {
+    void setExplicitLac(ByteBuf lac) throws IOException {
         ledgerStorage.setExplicitlac(ledgerId, lac);
     }
 
     @Override
-    ByteBuffer getExplicitLac() {
+    ByteBuf getExplicitLac() {
         return ledgerStorage.getExplicitLac(ledgerId);
     }
-    @Override
-    long addEntry(ByteBuffer entry) throws IOException {
-        long ledgerId = entry.getLong();
+
+    long addEntry(ByteBuf entry) throws IOException {
+        long ledgerId = entry.getLong(entry.readerIndex());
 
         if (ledgerId != this.ledgerId) {
             throw new IOException("Entry for ledger " + ledgerId + " was sent to " + this.ledgerId);
         }
-        entry.rewind();
 
         return ledgerStorage.addEntry(entry);
     }
 
     @Override
-    ByteBuffer readEntry(long entryId) throws IOException {
+    ByteBuf readEntry(long entryId) throws IOException {
         return ledgerStorage.getEntry(ledgerId, entryId);
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
index 29dcfaf..40bf988 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
@@ -21,8 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 /**
  * Implements a ledger inside a bookie. In particular, it implements operations
  * to write entries to a ledger and read entries from a ledger.
@@ -39,7 +40,7 @@ public class LedgerDescriptorReadOnlyImpl extends LedgerDescriptorImpl {
     }
 
     @Override
-    long addEntry(ByteBuffer entry) throws IOException {
+    long addEntry(ByteBuf entry) throws IOException {
         assert false;
         throw new IOException("Invalid action on read only descriptor");
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index fbdd6b9..4587460 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -21,13 +21,13 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
-
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.meta.LedgerManager;
 
@@ -101,12 +101,12 @@ public interface LedgerStorage {
      * Add an entry to the storage.
      * @return the entry id of the entry added
      */
-    long addEntry(ByteBuffer entry) throws IOException;
+    long addEntry(ByteBuf entry) throws IOException;
 
     /**
      * Read an entry from storage
      */
-    ByteBuffer getEntry(long ledgerId, long entryId) throws IOException;
+    ByteBuf getEntry(long ledgerId, long entryId) throws IOException;
 
     /**
      * Get last add confirmed.
@@ -162,7 +162,7 @@ public interface LedgerStorage {
      */
     BKMBeanInfo getJMXBean();
 
-    void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException;
+    void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException;
 
-    ByteBuffer getExplicitLac(long ledgerId);
+    ByteBuf getExplicitLac(long ledgerId);
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 698dbd3..105a8b5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Executors;
@@ -27,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -93,12 +96,12 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
     }
 
     @Override
-    public long addEntry(ByteBuffer entry) throws IOException {
-        long ledgerId = entry.getLong();
-        long entryId = entry.getLong();
-        long lac = entry.getLong();
-        entry.rewind();
-        memTable.addEntry(ledgerId, entryId, entry, this);
+    public long addEntry(ByteBuf entry) throws IOException {
+        long ledgerId = entry.getLong(entry.readerIndex() + 0);
+        long entryId = entry.getLong(entry.readerIndex() + 8);
+        long lac = entry.getLong(entry.readerIndex() + 16);
+
+        memTable.addEntry(ledgerId, entryId, entry.nioBuffer(), this);
         ledgerCache.updateLastAddConfirmed(ledgerId, lac);
         return entryId;
     }
@@ -108,7 +111,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
      * @param ledgerId
      * @return
      */
-    private ByteBuffer getLastEntryId(long ledgerId) throws IOException {
+    private ByteBuf getLastEntryId(long ledgerId) throws IOException {
         EntryKeyValue kv = memTable.getLastEntry(ledgerId);
         if (null != kv) {
             return kv.getValueAsByteBuffer();
@@ -118,11 +121,11 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
     }
 
     @Override
-    public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+    public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
         if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
             return getLastEntryId(ledgerId);
         }
-        ByteBuffer buffToRet;
+        ByteBuf buffToRet;
         try {
             buffToRet = super.getEntry(ledgerId, entryId);
         } catch (Bookie.NoEntryException nee) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 148b31d..b1f86ae 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,17 +20,6 @@
  */
 package org.apache.bookkeeper.proto;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.handler.codec.MessageToMessageDecoder;
-import io.netty.handler.codec.MessageToMessageEncoder;
-
 import java.io.IOException;
 import java.util.List;
 
@@ -44,6 +33,17 @@ import com.google.protobuf.ExtensionRegistry;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.MessageLite;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.MessageToMessageEncoder;
+
 public class BookieProtoEncoding {
     private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
 
@@ -146,10 +146,8 @@ public class BookieProtoEncoding {
                 packet.readBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
 
                 // Read ledger and entry id without advancing the reader index
-                packet.markReaderIndex();
-                ledgerId = packet.readLong();
-                entryId = packet.readLong();
-                packet.resetReaderIndex();
+                ledgerId = packet.getLong(packet.readerIndex());
+                entryId = packet.getLong(packet.readerIndex() + 8);
                 return new BookieProtocol.AddRequest(h.getVersion(), ledgerId, entryId, flags, masterKey, packet.retain());
             case BookieProtocol.READENTRY:
                 ledgerId = packet.readLong();
@@ -169,6 +167,7 @@ public class BookieProtoEncoding {
                 builder.mergeFrom(new ByteBufInputStream(packet), extensionRegistry);
                 return new BookieProtocol.AuthRequest(h.getVersion(), builder.build());
             }
+
             return packet;
         }
     }
@@ -238,8 +237,9 @@ public class BookieProtoEncoding {
                 entryId = buffer.readLong();
 
                 if (rc == BookieProtocol.EOK) {
+                    ByteBuf content = buffer.slice();
                     return new BookieProtocol.ReadResponse(header.getVersion(), rc,
-                                                           ledgerId, entryId, buffer.slice());
+                                                           ledgerId, entryId, content.retain());
                 } else {
                     return new BookieProtocol.ReadResponse(header.getVersion(), rc,
                                                            ledgerId, entryId);
@@ -327,9 +327,6 @@ public class BookieProtoEncoding {
 
         @Override
         protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Encode request {} to channel {}.", msg, ctx.channel());
-            }
             if (msg instanceof BookkeeperProtocol.Request) {
                 out.add(REQ_V3.encode(msg, ctx.alloc()));
             } else if (msg instanceof BookieProtocol.Request) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 094daab..f0cfa58 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -22,8 +22,7 @@ package org.apache.bookkeeper.proto;
  */
 
 import io.netty.buffer.ByteBuf;
-
-import java.nio.ByteBuffer;
+import io.netty.buffer.Unpooled;
 
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
 
@@ -262,10 +261,6 @@ public interface BookieProtocol {
             return data;
         }
 
-        ByteBuffer getDataAsByteBuffer() {
-            return data.nioBuffer().slice();
-        }
-
         boolean isRecoveryAdd() {
             return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
         }
@@ -351,7 +346,7 @@ public interface BookieProtocol {
 
         ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
             super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
-            this.data = null;
+            this.data = Unpooled.EMPTY_BUFFER;
         }
 
         ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index bd98374..02fa1c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -17,10 +17,11 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -49,7 +50,7 @@ class ReadEntryProcessor extends PacketProcessorBase {
         LOG.debug("Received new read request: {}", request);
         int errorCode = BookieProtocol.EIO;
         long startTimeNanos = MathUtils.nowInNano();
-        ByteBuffer data = null;
+        ByteBuf data = null;
         try {
             Future<Boolean> fenceResult = null;
             if (read.isFencingRequest()) {
@@ -63,7 +64,7 @@ class ReadEntryProcessor extends PacketProcessorBase {
                 }
             }
             data = requestProcessor.bookie.readEntry(request.getLedgerId(), request.getEntryId());
-            LOG.debug("##### Read entry ##### {}", data.remaining());
+            LOG.debug("##### Read entry ##### {} -- ref-count: {}", data.readableBytes(), data.refCnt());
             if (null != fenceResult) {
                 // TODO:
                 // currently we don't have readCallback to run in separated read
@@ -127,6 +128,8 @@ class ReadEntryProcessor extends PacketProcessorBase {
                          requestProcessor.readRequestStats);
 
         } else {
+            ReferenceCountUtil.release(data);
+
             requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
             sendResponse(errorCode, ResponseBuilder.buildErrorResponse(errorCode, read),

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index fbfa71f..b04f6b9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -17,10 +17,11 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -65,7 +66,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
 
         LOG.debug("Received new read request: {}", request);
         StatusCode status;
-        ByteBuffer entryBody;
+        ByteBuf entryBody = null;
         try {
             Future<Boolean> fenceResult = null;
             if (readRequest.hasFlag() && readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER)) {
@@ -98,7 +99,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
                         status = StatusCode.EIO;
                     } else {
                         status = StatusCode.EOK;
-                        readResponse.setBody(ByteString.copyFrom(entryBody));
+                        readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
                     }
                 } catch (InterruptedException ie) {
                     LOG.error("Interrupting fence read entry (lid: {}, eid: {})",
@@ -114,7 +115,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
                     status = StatusCode.EIO;
                 }
             } else {
-                readResponse.setBody(ByteString.copyFrom(entryBody));
+                readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
                 status = StatusCode.EOK;
             }
         } catch (Bookie.NoLedgerException e) {
@@ -142,6 +143,8 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
                     TimeUnit.NANOSECONDS);
         }
 
+        ReferenceCountUtil.release(entryBody);
+
         // Finally set status and return. The body would have been updated if
         // a read went through.
         readResponse.setStatus(status);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index 0fbdbec..e9cc1cb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -21,7 +21,6 @@
 package org.apache.bookkeeper.proto;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -36,7 +35,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ByteString;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.util.ReferenceCountUtil;
 
 class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
     private final static Logger logger = LoggerFactory.getLogger(ReadLacProcessorV3.class);
@@ -61,14 +62,14 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
 
         logger.debug("Received ReadLac request: {}", request);
         StatusCode status = StatusCode.EOK;
-        ByteBuffer lastEntry;
-        ByteBuffer lac;
+        ByteBuf lastEntry = null;
+        ByteBuf lac = null;
         try {
             lastEntry = requestProcessor.bookie.readEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
             lac = requestProcessor.bookie.getExplicitLac(ledgerId);
             if (lac != null) {
-                readLacResponse.setLacBody(ByteString.copyFrom(lac));
-                readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry));
+                readLacResponse.setLacBody(ByteString.copyFrom(lac.nioBuffer()));
+                readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry.nioBuffer()));
             } else {
                 status = StatusCode.ENOENTRY;
             }
@@ -78,7 +79,11 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
         } catch (IOException e) {
             status = StatusCode.EIO;
             logger.error("IOException while performing readLac from ledger: {}", ledgerId);
+        } finally {
+            ReferenceCountUtil.release(lastEntry);
+            ReferenceCountUtil.release(lac);
         }
+
         if (status == StatusCode.EOK) {
             requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
index 1418437..c0be162 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
@@ -20,9 +20,7 @@
  */
 package org.apache.bookkeeper.proto;
 
-import io.netty.buffer.Unpooled;
-
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
 
 class ResponseBuilder {
     static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol.Request r) {
@@ -41,8 +39,8 @@ class ResponseBuilder {
                                               r.getEntryId());
     }
 
-    static BookieProtocol.Response buildReadResponse(ByteBuffer data, BookieProtocol.Request r) {
+    static BookieProtocol.Response buildReadResponse(ByteBuf data, BookieProtocol.Request r) {
         return new BookieProtocol.ReadResponse(r.getProtocolVersion(), BookieProtocol.EOK,
-                r.getLedgerId(), r.getEntryId(), Unpooled.wrappedBuffer(data));
+                r.getLedgerId(), r.getEntryId(), data);
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 46f7f7d..827aed9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -62,11 +62,9 @@ class WriteEntryProcessor extends PacketProcessorBase implements WriteCallback {
         int rc = BookieProtocol.EOK;
         try {
             if (add.isRecoveryAdd()) {
-                requestProcessor.bookie.recoveryAddEntry(add.getDataAsByteBuffer(),
-                                                         this, channel, add.getMasterKey());
+                requestProcessor.bookie.recoveryAddEntry(add.getData(), this, channel, add.getMasterKey());
             } else {
-                requestProcessor.bookie.addEntry(add.getDataAsByteBuffer(),
-                                                 this, channel, add.getMasterKey());
+                requestProcessor.bookie.addEntry(add.getData(), this, channel, add.getMasterKey());
             }
         } catch (IOException e) {
             LOG.error("Error writing " + add, e);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index e34e894..b4e89f8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -20,10 +20,11 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieException;
@@ -102,7 +103,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
         };
         StatusCode status = null;
         byte[] masterKey = addRequest.getMasterKey().toByteArray();
-        ByteBuffer entryToAdd = addRequest.getBody().asReadOnlyByteBuffer();
+        ByteBuf entryToAdd = Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
         try {
             if (addRequest.hasFlag() && addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) {
                 requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb, channel, masterKey);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index 097a573..e8ffb34 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 
 class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
@@ -69,7 +70,7 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
         byte[] masterKey = writeLacRequest.getMasterKey().toByteArray();
 
         try {
-            requestProcessor.bookie.setExplicitLac(lacToAdd, channel, masterKey);
+            requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd), channel, masterKey);
             status = StatusCode.EOK;
         } catch (IOException e) {
             logger.error("Error saving lac for ledger:{}",

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 1dfa32e..711d38b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.junit.Test;
 import org.junit.After;
+
 import static org.junit.Assert.*;
 
 public class BookieJournalTest {
@@ -620,15 +621,15 @@ public class BookieJournalTest {
         b.readEntry(1, 99);
 
         // still able to read last entry, but it's junk
-        ByteBuffer buf = b.readEntry(1, 100);
-        assertEquals("Ledger Id is wrong", buf.getLong(), 1);
-        assertEquals("Entry Id is wrong", buf.getLong(), 100);
-        assertEquals("Last confirmed is wrong", buf.getLong(), 99);
-        assertEquals("Length is wrong", buf.getLong(), 100*1024);
-        buf.getLong(); // skip checksum
+        ByteBuf buf = b.readEntry(1, 100);
+        assertEquals("Ledger Id is wrong", buf.readLong(), 1);
+        assertEquals("Entry Id is wrong", buf.readLong(), 100);
+        assertEquals("Last confirmed is wrong", buf.readLong(), 99);
+        assertEquals("Length is wrong", buf.readLong(), 100*1024);
+        buf.readLong(); // skip checksum
         boolean allX = true;
         for (int i = 0; i < 1024; i++) {
-            byte x = buf.get();
+            byte x = buf.readByte();
             allX = allX && x == (byte)'X';
         }
         assertFalse("Some of buffer should have been zeroed", allX);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 6ae5e60..067b411 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -20,9 +20,11 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -51,6 +53,7 @@ import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.TestUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -571,14 +574,13 @@ public class CompactionTest extends BookKeeperClusterTestCase {
         storage.gcThread.doCompactEntryLogs(threshold);
     }
 
-    private ByteBuffer genEntry(long ledger, long entry, int size) {
-        ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-        bb.putLong(ledger);
-        bb.putLong(entry);
-        while (bb.hasRemaining()) {
-            bb.put((byte)0xFF);
+    private ByteBuf genEntry(long ledger, long entry, int size) {
+        ByteBuf bb = Unpooled.buffer(size);
+        bb.writeLong(ledger);
+        bb.writeLong(entry);
+        while (bb.isWritable()) {
+            bb.writeByte((byte)0xFF);
         }
-        bb.flip();
         return bb;
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 4e1004c..7eac1a2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -20,9 +20,12 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -72,9 +75,9 @@ public class EntryLogTest {
         Bookie bookie = new Bookie(conf);
         // create some entries
         EntryLogger logger = ((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
-        logger.addEntry(1, generateEntry(1, 1));
-        logger.addEntry(3, generateEntry(3, 1));
-        logger.addEntry(2, generateEntry(2, 1));
+        logger.addEntry(1, generateEntry(1, 1).nioBuffer());
+        logger.addEntry(3, generateEntry(3, 1).nioBuffer());
+        logger.addEntry(2, generateEntry(2, 1).nioBuffer());
         logger.flush();
         // now lets truncate the file to corrupt the last entry, which simulates a partial write
         File f = new File(curDir, "0.log");
@@ -91,13 +94,12 @@ public class EntryLogTest {
         assertNotNull(meta.getLedgersMap().get(3L));
     }
 
-    private ByteBuffer generateEntry(long ledger, long entry) {
+    private ByteBuf generateEntry(long ledger, long entry) {
         byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
-        ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]);
-        bb.putLong(ledger);
-        bb.putLong(entry);
-        bb.put(data);
-        bb.flip();
+        ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
+        bb.writeLong(ledger);
+        bb.writeLong(entry);
+        bb.writeBytes(data);
         return bb;
     }
 
@@ -120,7 +122,7 @@ public class EntryLogTest {
             EntryLogger logger = new EntryLogger(conf,
                     bookie.getLedgerDirsManager());
             for (int j=0; j<numEntries; j++) {
-                positions[i][j] = logger.addEntry(i, generateEntry(i, j));
+                positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
             }
             logger.flush();
         }
@@ -135,7 +137,7 @@ public class EntryLogTest {
             EntryLogger logger = new EntryLogger(conf,
                     bookie.getLedgerDirsManager());
             for (int j=0; j<numEntries; j++) {
-                positions[i][j] = logger.addEntry(i, generateEntry(i, j));
+                positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
             }
             logger.flush();
         }
@@ -231,10 +233,10 @@ public class EntryLogTest {
 
         // create some entries
         EntryLogger logger = ((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
-        logger.addEntry(1, generateEntry(1, 1));
-        logger.addEntry(3, generateEntry(3, 1));
-        logger.addEntry(2, generateEntry(2, 1));
-        logger.addEntry(1, generateEntry(1, 2));
+        logger.addEntry(1, generateEntry(1, 1).nioBuffer());
+        logger.addEntry(3, generateEntry(3, 1).nioBuffer());
+        logger.addEntry(2, generateEntry(2, 1).nioBuffer());
+        logger.addEntry(1, generateEntry(1, 2).nioBuffer());
         logger.rollLog();
         logger.flushRotatedLogs();
 
@@ -265,10 +267,10 @@ public class EntryLogTest {
 
         // create some entries
         EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger;
-        logger.addEntry(1, generateEntry(1, 1));
-        logger.addEntry(3, generateEntry(3, 1));
-        logger.addEntry(2, generateEntry(2, 1));
-        logger.addEntry(1, generateEntry(1, 2));
+        logger.addEntry(1, generateEntry(1, 1).nioBuffer());
+        logger.addEntry(3, generateEntry(3, 1).nioBuffer());
+        logger.addEntry(2, generateEntry(2, 1).nioBuffer());
+        logger.addEntry(1, generateEntry(1, 2).nioBuffer());
         logger.rollLog();
 
         // Rewrite the entry log header to be on V0 format

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index b63806e..41ab89c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -21,6 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -330,7 +333,7 @@ public class LedgerCacheTest {
         Bookie b = new Bookie(conf);
         b.start();
         for (int i = 1; i <= numLedgers; i++) {
-            ByteBuffer packet = generateEntry(i, 1);
+            ByteBuf packet = generateEntry(i, 1);
             b.addEntry(packet, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
         }
 
@@ -511,7 +514,7 @@ public class LedgerCacheTest {
         // this bookie.addEntry call is required. FileInfo for Ledger 1 would be created with this call.
         // without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls will fail because of BOOKKEEPER-965 change.
         bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), null, "passwd".getBytes());
-        
+
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
         assertFalse("Bookie is expected to be in ReadWrite mode", bookie.isReadOnly());
         assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
@@ -535,14 +538,13 @@ public class LedgerCacheTest {
         assertTrue("EntryMemTable SnapShot is expected to be empty, because of successful flush",
                 memTable.snapshot.isEmpty());
     }
-    
-    private ByteBuffer generateEntry(long ledger, long entry) {
+
+    private ByteBuf generateEntry(long ledger, long entry) {
         byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
-        ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]);
-        bb.putLong(ledger);
-        bb.putLong(entry);
-        bb.put(data);
-        bb.flip();
+        ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
+        bb.writeLong(ledger);
+        bb.writeLong(entry);
+        bb.writeBytes(data);
         return bb;
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 3672985..9352db3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -20,10 +20,10 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Callable;
@@ -35,17 +35,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
-
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
-
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.junit.Test;
 import org.junit.Before;
 import org.junit.After;
@@ -314,12 +311,12 @@ public class TestSyncThread {
         }
 
         @Override
-        public long addEntry(ByteBuffer entry) throws IOException {
+        public long addEntry(ByteBuf entry) throws IOException {
             return 1L;
         }
 
         @Override
-        public ByteBuffer getEntry(long ledgerId, long entryId)
+        public ByteBuf getEntry(long ledgerId, long entryId)
                 throws IOException {
             return null;
         }
@@ -334,11 +331,11 @@ public class TestSyncThread {
         }
 
         @Override
-        public void setExplicitlac(long ledgerId, ByteBuffer lac) {
+        public void setExplicitlac(long ledgerId, ByteBuf lac) {
         }
 
         @Override
-        public ByteBuffer getExplicitLac(long ledgerId) {
+        public ByteBuf getExplicitLac(long ledgerId) {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index d781d5d..41a8ecd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -39,6 +39,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.util.concurrent.SettableFuture;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Enumeration;
@@ -72,7 +75,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
 
         Bookie delayBookie = new Bookie(conf) {
                 @Override
-                public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb,
+                public void recoveryAddEntry(ByteBuf entry, WriteCallback cb,
                                              Object ctx, byte[] masterKey)
                         throws IOException, BookieException {
                     try {
@@ -86,7 +89,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
                 }
 
                 @Override
-                public void addEntry(ByteBuffer entry, WriteCallback cb,
+                public void addEntry(ByteBuf entry, WriteCallback cb,
                                      Object ctx, byte[] masterKey)
                         throws IOException, BookieException {
                     try {
@@ -100,7 +103,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
                 }
 
                 @Override
-                public ByteBuffer readEntry(long ledgerId, long entryId)
+                public ByteBuf readEntry(long ledgerId, long entryId)
                         throws IOException, NoLedgerException {
                     try {
                         Thread.sleep(5000);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index a7bcf77..0c470a2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -194,7 +196,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
             throws Exception {
         Bookie sBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 try {
                     latch.await();
@@ -204,7 +206,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
             }
 
             @Override
-            public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+            public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 throw new IOException("Dead bookie for recovery adds.");
             }
@@ -218,7 +220,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
     private void startDeadBookie(ServerConfiguration conf, final CountDownLatch latch) throws Exception {
         Bookie dBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 try {
                     latch.await();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index f54cde1..5f7c56f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -21,6 +21,8 @@ package org.apache.bookkeeper.client;
  *
  */
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
@@ -186,7 +188,7 @@ public class LedgerRecoveryTest extends BaseTestCase {
 
         Bookie fakeBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 // drop request to simulate a slow and failed bookie
             }
@@ -245,7 +247,7 @@ public class LedgerRecoveryTest extends BaseTestCase {
         ServerConfiguration conf = newServerConfiguration();
         Bookie deadBookie1 = new Bookie(conf) {
             @Override
-            public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+            public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 // drop request to simulate a slow and failed bookie
                 throw new IOException("Couldn't write for some reason");
@@ -326,7 +328,7 @@ public class LedgerRecoveryTest extends BaseTestCase {
         ServerConfiguration conf = newServerConfiguration();
         Bookie deadBookie1 = new Bookie(conf) {
             @Override
-            public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+            public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 // drop request to simulate a slow and failed bookie
                 throw new IOException("Couldn't write for some reason");
@@ -410,7 +412,7 @@ public class LedgerRecoveryTest extends BaseTestCase {
     private void startDeadBookie(ServerConfiguration conf) throws Exception {
         Bookie rBookie = new Bookie(conf) {
             @Override
-            public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+            public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 // drop request to simulate a dead bookie
                 throw new IOException("Couldn't write entries for some reason");

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 280db05..0db938b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,14 +21,16 @@
 
 package org.apache.bookkeeper.meta;
 
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -336,11 +338,11 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         }
 
         @Override
-        public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException {
+        public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
         }
 
         @Override
-        public ByteBuffer getExplicitLac(long ledgerId) {
+        public ByteBuf getExplicitLac(long ledgerId) {
             return null;
         }
 
@@ -369,12 +371,12 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         }
 
         @Override
-        public long addEntry(ByteBuffer entry) throws IOException {
+        public long addEntry(ByteBuf entry) throws IOException {
             return 0;
         }
 
         @Override
-        public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+        public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 8d13102..ac9fe59 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -22,7 +22,6 @@
 package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
@@ -35,7 +34,6 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
-import org.apache.bookkeeper.bookie.LedgerStorage.LedgerDeletionListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -49,6 +47,8 @@ import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+
 /**
  * Test case to run over serveral ledger managers
  */
@@ -153,12 +153,12 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         }
 
         @Override
-        public long addEntry(ByteBuffer entry) throws IOException {
+        public long addEntry(ByteBuf entry) throws IOException {
             return 0;
         }
 
         @Override
-        public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+        public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
             return null;
         }
 
@@ -213,13 +213,13 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         }
 
         @Override
-        public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException {
+        public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
             // TODO Auto-generated method stub
 
         }
 
         @Override
-        public ByteBuffer getExplicitLac(long ledgerId) {
+        public ByteBuf getExplicitLac(long ledgerId) {
             // TODO Auto-generated method stub
             return null;
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index 16bbfd0..e2a4f32 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -235,7 +235,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
 
         Bookie delayBookie = new Bookie(conf) {
             @Override
-            public ByteBuffer readEntry(long ledgerId, long entryId)
+            public ByteBuf readEntry(long ledgerId, long entryId)
                     throws IOException, NoLedgerException {
                 try {
                     Thread.sleep(3000);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 428a597..159b859 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -40,6 +40,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
@@ -251,7 +253,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
 
         Bookie deadBookie = new Bookie(conf) {
             @Override
-            public ByteBuffer readEntry(long ledgerId, long entryId)
+            public ByteBuf readEntry(long ledgerId, long entryId)
                     throws IOException, NoLedgerException {
                 // we want to disable during checking
                 numReads.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
index 94319df..7a65388 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
@@ -21,6 +21,9 @@
 
 package org.apache.bookkeeper.test;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -148,12 +151,12 @@ public class ConcurrentLedgerTest {
         long start = System.currentTimeMillis();
         for(int i = 1; i <= totalwrites/ledgers; i++) {
             for(int j = 1; j <= ledgers; j++) {
-                ByteBuffer entry = bookie.readEntry(j, i);
+                ByteBuf entry = bookie.readEntry(j, i);
                 // skip the ledger id and the entry id
-                entry.getLong();
-                entry.getLong();
-                assertEquals(j + "@" + i, j+2, entry.getLong());
-                assertEquals(j + "@" + i, i+3, entry.getLong());
+                entry.readLong();
+                entry.readLong();
+                assertEquals(j + "@" + i, j+2, entry.readLong());
+                assertEquals(j + "@" + i, i+3, entry.readLong());
             }
         }
         long finish = System.currentTimeMillis();
@@ -184,7 +187,7 @@ public class ConcurrentLedgerTest {
                 bytes.position(0);
                 bytes.limit(bytes.capacity());
                 throttle.acquire();
-                bookie.addEntry(bytes, cb, counter, zeros);
+                bookie.addEntry(Unpooled.wrappedBuffer(bytes), cb, counter, zeros);
             }
         }
         long finish = System.currentTimeMillis();