You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/04/04 01:22:11 UTC

bookkeeper git commit: BOOKKEEPER-1018: Revert ": Allow client to select older V2 protocol (no…

Repository: bookkeeper
Updated Branches:
  refs/heads/master 9001e300c -> f30f60889


BOOKKEEPER-1018: Revert ": Allow client to select older V2 protocol (no\u2026

\u2026 protobuf)"

This reverts commit 9001e300ce0d5d2655d437e3eaa52f91487caed6.

I broke trunk - not exactly sure how - I will fix it and put up the PR again. For now I'm reverting the commit.

Author: Govind Menon <go...@gmail.com>

Reviewers: Sijie Guo <si...@apache.org>

Closes #124 from govind-menon/BOOKKEEPER-1018-Revert


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/f30f6088
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/f30f6088
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/f30f6088

Branch: refs/heads/master
Commit: f30f60889e0810a47797daf4107c0d9bc2ee998c
Parents: 9001e30
Author: Govind Menon <go...@gmail.com>
Authored: Mon Apr 3 18:22:07 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Apr 3 18:22:07 2017 -0700

----------------------------------------------------------------------
 .../bookkeeper/conf/ClientConfiguration.java    |  23 +-
 .../apache/bookkeeper/proto/AuthHandler.java    |   8 -
 .../apache/bookkeeper/proto/BookieProtocol.java |   3 -
 .../proto/PerChannelBookieClient.java           | 436 ++++++-------------
 4 files changed, 134 insertions(+), 336 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 2b75e9e..ee137c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -57,7 +57,7 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String CLIENT_WRITEBUFFER_HIGH_WATER_MARK = "clientWriteBufferHighWaterMark";
     protected final static String CLIENT_CONNECT_TIMEOUT_MILLIS = "clientConnectTimeoutMillis";
     protected final static String NUM_CHANNELS_PER_BOOKIE = "numChannelsPerBookie";
-    protected final static String USE_V2_WIRE_PROTOCOL = "useV2WireProtocol";
+
     // Read Parameters
     protected final static String READ_TIMEOUT = "readTimeout";
     protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
@@ -436,27 +436,6 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
-     * Use older Bookkeeper wire protocol (no protobuf)
-     *
-     * @return whether or not to use older Bookkeeper wire protocol (no protobuf)
-     */
-    public boolean getUseV2WireProtocol() {
-        return getBoolean(USE_V2_WIRE_PROTOCOL, false);
-    }
-
-    /**
-     * Set whether or not to use older Bookkeeper wire protocol (no protobuf)
-     *
-     * @param useV2WireProtocol
-     *          whether or not to use older Bookkeeper wire protocol (no protobuf)
-     * @return client configuration.
-     */
-    public ClientConfiguration setUseV2WireProtocol(boolean useV2WireProtocol) {
-        setProperty(USE_V2_WIRE_PROTOCOL, useV2WireProtocol);
-        return this;
-    }
-
-    /**
      * Get zookeeper servers to connect
      *
      * @return zookeeper servers

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index d2608e7..75dced5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -322,14 +322,6 @@ class AuthHandler {
                     } else {
                         waitingForAuth.add(e);
                     }
-                } else if (e.getMessage() instanceof BookieProtocol.Request) {
-                    // let auth messages through, queue the rest
-                    BookieProtocol.Request req = (BookieProtocol.Request)e.getMessage();
-                    if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) {
-                        super.writeRequested(ctx, e);
-                    } else {
-                        waitingForAuth.add(e);
-                    }
                 } // else just drop
             }
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 1191d3c..2ce5ed8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -140,9 +140,6 @@ public interface BookieProtocol {
      * by the auth providers themselves.
      */
     public static final byte AUTH = 3;
-    public static final byte READ_LAC = 4;
-    public static final byte WRITE_LAC = 5;
-    public static final byte GET_BOOKIE_INFO = 6;
 
     /**
      * The error code that indicates success

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/f30f6088/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index a24bb1e..f6e9e8f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -21,10 +21,8 @@ import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
 import java.util.Collections;
-import java.util.Collection;
 import java.util.Queue;
 import java.util.Set;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -100,7 +98,7 @@ import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
 import java.net.SocketAddress;
-
+import java.util.Collection;
 import org.apache.bookkeeper.auth.BookKeeperPrincipal;
 import org.jboss.netty.channel.ChannelFactory;
 import org.apache.bookkeeper.client.ClientConnectionPeer;
@@ -149,8 +147,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     private final OpStatsLogger getBookieInfoOpLogger;
     private final OpStatsLogger getBookieInfoTimeoutOpLogger;
 
-    private final boolean useV2WireProtocol;
-
     /**
      * The following member variables do not need to be concurrent, or volatile
      * because they are always updated under a lock
@@ -206,7 +202,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         this.addEntryTimeout = conf.getAddEntryTimeout();
         this.readEntryTimeout = conf.getReadEntryTimeout();
         this.getBookieInfoTimeout = conf.getBookieInfoTimeout();
-        this.useV2WireProtocol = conf.getUseV2WireProtocol();
 
         this.authProviderFactory = authProviderFactory;
         this.extRegistry = extRegistry;
@@ -474,45 +469,33 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
                   Object ctx, final int options) {
-        Object request = null;
-        CompletionKey completion = null;
-        if (useV2WireProtocol) {
-            completion = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
-            request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
-                    (short) options, masterKey, toSend);
-
+        final long txnId = getTxnId();
+        final int entrySize = toSend.readableBytes();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.ADD_ENTRY);
+        completionObjects.put(completionKey,
+                new AddCompletion(this, addEntryOpLogger, cb, ctx, ledgerId, entryId,
+                                  scheduleTimeout(completionKey, addEntryTimeout)));
 
-        } else {
-            final long txnId = getTxnId();
-            completion = new CompletionKey(txnId, OperationType.ADD_ENTRY);
-            // Build the request and calculate the total size to be included in the packet.
-            BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
-                    .setVersion(ProtocolVersion.VERSION_THREE)
-                    .setOperation(OperationType.ADD_ENTRY)
-                    .setTxnId(txnId);
-
-            AddRequest.Builder addBuilder = AddRequest.newBuilder()
-                    .setLedgerId(ledgerId)
-                    .setEntryId(entryId)
-                    .setMasterKey(ByteString.copyFrom(masterKey))
-                    .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
-
-            if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
-                addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
-            }
-            request = Request.newBuilder()
-                    .setHeader(headerBuilder)
-                    .setAddRequest(addBuilder)
-                    .build();
-        }
+        // Build the request and calculate the total size to be included in the packet.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.ADD_ENTRY)
+                .setTxnId(txnId);
 
-        final Object addRequest = request;
-        final CompletionKey completionKey = completion;
+        AddRequest.Builder addBuilder = AddRequest.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId)
+                .setMasterKey(ByteString.copyFrom(masterKey))
+                .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
 
-        completionObjects.put(completionKey, new AddCompletion(this,
-                addEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completion, addEntryTimeout)));
+        if (((short)options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
+            addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
+        }
 
-        final int entrySize = toSend.readableBytes();
+        final Request addRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setAddRequest(addBuilder)
+                .build();
 
         final Channel c = channel;
         if (c == null) {
@@ -548,44 +531,28 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
                                         final long entryId,
                                         ReadEntryCallback cb, Object ctx) {
-        Object request = null;
-        CompletionKey completion = null;
-        if (useV2WireProtocol) {
-            completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
-            request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
-                    BookieProtocol.FLAG_DO_FENCING, masterKey);
-        } else {
-            final long txnId = getTxnId();
-            final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
-            completionObjects.put(completionKey,
-                    new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
-                            scheduleTimeout(completionKey, readEntryTimeout)));
-
-            // Build the request and calculate the total size to be included in the packet.
-            BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
-                    .setVersion(ProtocolVersion.VERSION_THREE)
-                    .setOperation(OperationType.READ_ENTRY)
-                    .setTxnId(txnId);
-
-            ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
-                    .setLedgerId(ledgerId)
-                    .setEntryId(entryId)
-                    .setMasterKey(ByteString.copyFrom(masterKey))
-                    .setFlag(ReadRequest.Flag.FENCE_LEDGER);
-
-            final Request readRequest = Request.newBuilder()
-                    .setHeader(headerBuilder)
-                    .setReadRequest(readBuilder)
-                    .build();
-        }
-
-        final CompletionKey completionKey = completion;
-        if (completionObjects.putIfAbsent(completionKey, new ReadCompletion(this, readEntryOpLogger, cb,
-                ctx, ledgerId, entryId, scheduleTimeout(completionKey, readEntryTimeout))) != null) {
-            // We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
-            cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx);
-            return;
-        }
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
+        completionObjects.put(completionKey,
+                new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
+                                   scheduleTimeout(completionKey, readEntryTimeout)));
+
+        // Build the request and calculate the total size to be included in the packet.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.READ_ENTRY)
+                .setTxnId(txnId);
+
+        ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId)
+                .setMasterKey(ByteString.copyFrom(masterKey))
+                .setFlag(ReadRequest.Flag.FENCE_LEDGER);
+
+        final Request readRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setReadRequest(readBuilder)
+                .build();
 
         final Channel c = channel;
         if (c == null) {
@@ -593,7 +560,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
             return;
         }
 
-        final Object readRequest = request;
         try {
             ChannelFuture future = c.write(readRequest);
             future.addListener(new ChannelFutureListener() {
@@ -620,34 +586,22 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
-        Object request = null;
-        CompletionKey completion = null;
-        if (useV2WireProtocol) {
-            request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                    ledgerId, (long) 0, (short) 0);
-            completion = new V2CompletionKey(ledgerId, (long) 0, OperationType.READ_LAC);
-        } else {
-            final long txnId = getTxnId();
-            final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC);
-
-            // Build the request and calculate the total size to be included in the packet.
-            BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
-                    .setVersion(ProtocolVersion.VERSION_THREE)
-                    .setOperation(OperationType.READ_LAC)
-                    .setTxnId(txnId);
-            ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
-                    .setLedgerId(ledgerId);
-            request = Request.newBuilder()
-                    .setHeader(headerBuilder)
-                    .setReadLacRequest(readLacBuilder)
-                    .build();
-        }
-        final Object readLacRequest = request;
-        final CompletionKey completionKey = completion;
-
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC);
         completionObjects.put(completionKey,
                 new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId,
                         scheduleTimeout(completionKey, readEntryTimeout)));
+        // Build the request and calculate the total size to be included in the packet.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.READ_LAC)
+                .setTxnId(txnId);
+        ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
+                .setLedgerId(ledgerId);
+        final Request readLacRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setReadLacRequest(readLacBuilder)
+                .build();
         final Channel c = channel;
         if (c == null) {
             errorOutReadLacKey(completionKey);
@@ -678,37 +632,27 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
-        Object request = null;
-        CompletionKey completion = null;
-        if (useV2WireProtocol) {
-            request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                    ledgerId, entryId, (short) 0);
-            completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
-        } else {
-            final long txnId = getTxnId();
-            completion = new CompletionKey(txnId, OperationType.READ_ENTRY);
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
+        completionObjects.put(completionKey,
+                new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
+                                   scheduleTimeout(completionKey, readEntryTimeout)));
 
-            // Build the request and calculate the total size to be included in the packet.
-            BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
-                    .setVersion(ProtocolVersion.VERSION_THREE)
-                    .setOperation(OperationType.READ_ENTRY)
-                    .setTxnId(txnId);
+        // Build the request and calculate the total size to be included in the packet.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.READ_ENTRY)
+                .setTxnId(txnId);
 
-            ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
-                    .setLedgerId(ledgerId)
-                    .setEntryId(entryId);
+        ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId);
 
-            final Request readRequest = Request.newBuilder()
-                    .setHeader(headerBuilder)
-                    .setReadRequest(readBuilder)
-                    .build();
-        }
-        final Object readRequest = request;
-        final CompletionKey completionKey = completion;
+        final Request readRequest = Request.newBuilder()
+                .setHeader(headerBuilder)
+                .setReadRequest(readBuilder)
+                .build();
 
-        completionObjects.put(completionKey,
-                new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
-                        scheduleTimeout(completionKey, readEntryTimeout)));
         final Channel c = channel;
         if (c == null) {
             errorOutReadKey(completionKey);
@@ -859,7 +803,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     void errorOutReadKey(final CompletionKey key, final int rc) {
-        LOG.info("Removing completion key: {}", key);
         final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key);
         if (null == readCompletion) {
             return;
@@ -892,7 +835,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     void errorOutWriteLacKey(final CompletionKey key, final int rc) {
-        LOG.info("Removing completion key: {}", key);
         final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key);
         if (null == writeLacCompletion) {
             return;
@@ -917,7 +859,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     void errorOutReadLacKey(final CompletionKey key, final int rc) {
-        LOG.info("Removing completion key: {}", key);
         final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key);
         if (null == readLacCompletion) {
             return;
@@ -1117,101 +1058,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
      */
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-
-        if (e.getMessage() instanceof BookieProtocol.Response) {
-            BookieProtocol.Response response = (BookieProtocol.Response) e.getMessage();
-            readV2Response(response);
-        } else if (e.getMessage() instanceof Response) {
-            Response response = (Response) e.getMessage();
-            readV3Response(response);
-        } else {
+        if (!(e.getMessage() instanceof Response)) {
             ctx.sendUpstream(e);
+            return;
         }
-    }
-
-    private void readV2Response(final BookieProtocol.Response response) {
-        final long ledgerId = response.ledgerId;
-        final long entryId = response.entryId;
-
-        final OperationType operationType = getOperationType(response.getOpCode());
-        final StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
-
-        final CompletionValue completionValue = completionObjects.remove(new V2CompletionKey(ledgerId, entryId, operationType));
-
-        if (null == completionValue) {
-            // Unexpected response, so log it. The txnId should have been present.
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + operationType
-                        + " and ledger:entry : " + ledgerId + ":" + entryId);
-            }
-        } else {
-            long orderingKey = completionValue.ledgerId;
-
-            executor.submitOrdered(orderingKey, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    switch (operationType) {
-                        case ADD_ENTRY: {
-                            handleAddResponse(ledgerId, entryId, status, completionValue);
-                        }
-                        case READ_ENTRY: {
-                            BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
-                            handleReadResponse(ledgerId, entryId, status, readResponse.getData(), completionValue);
-                        }
-                        default:
-                            LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", operationType, addr);
-                            break;
-                    }
-                }
-            });
-        }
-    }
 
-    private StatusCode getStatusCodeFromErrorCode(int errorCode) {
-        switch (errorCode) {
-            case BookieProtocol.EOK:
-                return StatusCode.EOK;
-            case BookieProtocol.ENOLEDGER:
-                return StatusCode.ENOLEDGER;
-            case BookieProtocol.ENOENTRY:
-                return StatusCode.ENOENTRY;
-            case BookieProtocol.EBADREQ:
-                return StatusCode.EBADREQ;
-            case BookieProtocol.EIO:
-                return StatusCode.EIO;
-            case BookieProtocol.EUA:
-                return StatusCode.EUA;
-            case BookieProtocol.EBADVERSION:
-                return StatusCode.EBADVERSION;
-            case BookieProtocol.EFENCED:
-                return StatusCode.EFENCED;
-            case BookieProtocol.EREADONLY:
-                return StatusCode.EREADONLY;
-            default:
-                throw new IllegalArgumentException("Invalid error code: " + errorCode);
-        }
-    }
-
-    private OperationType getOperationType(byte opCode) {
-        switch (opCode) {
-            case BookieProtocol.ADDENTRY:
-                return  OperationType.ADD_ENTRY;
-            case BookieProtocol.READENTRY:
-                return OperationType.READ_ENTRY;
-            case BookieProtocol.AUTH:
-                return OperationType.AUTH;
-            case BookieProtocol.READ_LAC:
-                return OperationType.READ_LAC;
-            case BookieProtocol.WRITE_LAC:
-                return OperationType.WRITE_LAC;
-            case BookieProtocol.GET_BOOKIE_INFO:
-                return OperationType.GET_BOOKIE_INFO;
-            default:
-                throw new IllegalArgumentException("Invalid operation type");
-        }
-    }
-
-    private void readV3Response(final Response response) {
+        final Response response = (Response) e.getMessage();
         final BKPacketHeader header = response.getHeader();
 
         final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(),
@@ -1230,51 +1082,21 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                 public void safeRun() {
                     OperationType type = header.getOperation();
                     switch (type) {
-                        case ADD_ENTRY: {
-                            AddResponse addResponse = response.getAddResponse();
-                            StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
-                            handleAddResponse(addResponse.getLedgerId(), addResponse.getEntryId(), status, completionValue);
+                        case ADD_ENTRY:
+                            handleAddResponse(response, completionValue);
                             break;
-                        }
-                        case READ_ENTRY: {
-                            ReadResponse readResponse = response.getReadResponse();
-                            StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
-                            ChannelBuffer buffer = ChannelBuffers.buffer(0);
-                            if (readResponse.hasBody()) {
-                                buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
-                            }
-                            handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, completionValue);
+                        case READ_ENTRY:
+                            handleReadResponse(response, completionValue);
                             break;
-                        }
-                        case WRITE_LAC: {
-                            WriteLacResponse writeLacResponse = response.getWriteLacResponse();
-                            StatusCode status = response.getStatus() == StatusCode.EOK ? writeLacResponse.getStatus() : response.getStatus();
-                            handleWriteLacResponse(writeLacResponse.getLedgerId(), status, completionValue);
+                        case WRITE_LAC:
+                            handleWriteLacResponse(response.getWriteLacResponse(), completionValue);
                             break;
-                        }
-                        case READ_LAC: {
-                            ReadLacResponse readLacResponse = response.getReadLacResponse();
-                            ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
-                            ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0);
-                            StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus();
-                            // Thread.dumpStack();
-
-                            if (readLacResponse.hasLacBody()) {
-                                lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
-                            }
-
-                            if (readLacResponse.hasLastEntryBody()) {
-                                lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
-                            }
-                            handleReadLacResponse(readLacResponse.getLedgerId(), status, lacBuffer, lastEntryBuffer, completionValue);
+                        case READ_LAC:
+                            handleReadLacResponse(response.getReadLacResponse(), completionValue);
                             break;
-                        }
-                        case GET_BOOKIE_INFO: {
-                            GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
-                            StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
-                            handleGetBookieInfoResponse(getBookieInfoResponse.getFreeDiskSpace(), getBookieInfoResponse.getTotalDiskCapacity(), status, completionValue);
+                        case GET_BOOKIE_INFO:
+                            handleGetBookieInfoResponse(response, completionValue);
                             break;
-                        }
                         default:
                             LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
                                       type, addr);
@@ -1292,10 +1114,13 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         }
     }
 
-    void handleWriteLacResponse(long ledgerId, StatusCode status, CompletionValue completionValue) {
+    void handleWriteLacResponse(WriteLacResponse writeLacResponse, CompletionValue completionValue) {
         // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
         WriteLacCompletion plc = (WriteLacCompletion)completionValue;
 
+        long ledgerId = writeLacResponse.getLedgerId();
+        StatusCode status = writeLacResponse.getStatus();
+
         LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
 
         // convert to BKException code
@@ -1308,9 +1133,14 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx);
     }
 
- void handleAddResponse(long ledgerId, long entryId, StatusCode status, CompletionValue completionValue) {
+ void handleAddResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of an AddCompletion object when we reach here.
         AddCompletion ac = (AddCompletion)completionValue;
+        AddResponse addResponse = response.getAddResponse();
+
+        long ledgerId = addResponse.getLedgerId();
+        long entryId = addResponse.getEntryId();
+        StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -1330,10 +1160,25 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
     }
 
-    void handleReadLacResponse(long ledgerId, StatusCode status, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, CompletionValue completionValue) {
+    void handleReadLacResponse(ReadLacResponse readLacResponse, CompletionValue completionValue) {
         // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
         ReadLacCompletion glac = (ReadLacCompletion)completionValue;
 
+        long ledgerId = readLacResponse.getLedgerId();
+        StatusCode status = readLacResponse.getStatus();
+        ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
+        ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0);
+
+       // Thread.dumpStack();
+
+        if (readLacResponse.hasLacBody()) {
+            lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+        }
+
+        if (readLacResponse.hasLastEntryBody()) {
+            lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+        }
+
         LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
         // convert to BKException code
         Integer rcToRet = statusCodeToExceptionCode(status);
@@ -1345,10 +1190,20 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx);
     }
 
-    void handleReadResponse(long ledgerId, long entryId, StatusCode status, ChannelBuffer buffer, CompletionValue completionValue) {
+    void handleReadResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of a ReadCompletion object when we reach here.
         ReadCompletion rc = (ReadCompletion)completionValue;
+        ReadResponse readResponse = response.getReadResponse();
+
+        long ledgerId = readResponse.getLedgerId();
+        long entryId = readResponse.getEntryId();
+        StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
 
+        ChannelBuffer buffer = ChannelBuffers.buffer(0);
+
+        if (readResponse.hasBody()) {
+            buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
+        }
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -1367,9 +1222,15 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx);
     }
 
-    void handleGetBookieInfoResponse(long freeDiskSpace, long totalDiskCapacity,  StatusCode status, CompletionValue completionValue) {
+    void handleGetBookieInfoResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of a GetBookieInfoCompletion object when we reach here.
         GetBookieInfoCompletion rc = (GetBookieInfoCompletion)completionValue;
+        GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
+
+        long freeDiskSpace = getBookieInfoResponse.hasFreeDiskSpace() ? getBookieInfoResponse.getFreeDiskSpace() : 0L;
+        long totalDiskCapacity = getBookieInfoResponse.hasTotalDiskCapacity() ? getBookieInfoResponse.getTotalDiskCapacity() : 0L;
+
+        StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Got response for read metadata request from bookie: {} rc {}", addr, rc);
@@ -1695,35 +1556,4 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         return txnIdGenerator.incrementAndGet();
     }
 
-    private class V2CompletionKey extends CompletionKey {
-        final long ledgerId;
-        final long entryId;
-
-
-        public V2CompletionKey(long ledgerId, long entryId, OperationType operationType) {
-            super(0, operationType);
-            this.ledgerId = ledgerId;
-            this.entryId = entryId;
-
-        }
-
-        @Override
-        public boolean equals(Object object) {
-            if (!(object instanceof V2CompletionKey)) {
-                return  false;
-            }
-            V2CompletionKey that = (V2CompletionKey) object;
-            return  this.entryId == that.entryId && this.ledgerId == that.ledgerId;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(ledgerId, entryId);
-        }
-
-        @Override
-        public String toString() {
-            return String.format("%d:%d %s", ledgerId, entryId, operationType);
-        }
-    }
 }