You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/04/03 19:31:51 UTC

bookkeeper git commit: BOOKKEEPER-1018: Allow client to select older V2 protocol (no protobuf)

Repository: bookkeeper
Updated Branches:
  refs/heads/master 9836c87dc -> 9001e300c


BOOKKEEPER-1018: Allow client to select older V2 protocol (no protobuf)

Originally done by Matteo Merli (merlimat). Tagging sijie and eolivelli  for review.

Author: Govind Menon <go...@gmail.com>

Reviewers: Sijie Guo <si...@apache.org>

Closes #120 from govind-menon/BOOKKEEPER-1018


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/9001e300
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/9001e300
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/9001e300

Branch: refs/heads/master
Commit: 9001e300ce0d5d2655d437e3eaa52f91487caed6
Parents: 9836c87
Author: Govind Menon <go...@gmail.com>
Authored: Mon Apr 3 12:31:47 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Apr 3 12:31:47 2017 -0700

----------------------------------------------------------------------
 .../bookkeeper/conf/ClientConfiguration.java    |  23 +-
 .../apache/bookkeeper/proto/AuthHandler.java    |   8 +
 .../apache/bookkeeper/proto/BookieProtocol.java |   3 +
 .../proto/PerChannelBookieClient.java           | 436 +++++++++++++------
 4 files changed, 336 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9001e300/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index ee137c0..2b75e9e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -57,7 +57,7 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String CLIENT_WRITEBUFFER_HIGH_WATER_MARK = "clientWriteBufferHighWaterMark";
     protected final static String CLIENT_CONNECT_TIMEOUT_MILLIS = "clientConnectTimeoutMillis";
     protected final static String NUM_CHANNELS_PER_BOOKIE = "numChannelsPerBookie";
-
+    protected final static String USE_V2_WIRE_PROTOCOL = "useV2WireProtocol";
     // Read Parameters
     protected final static String READ_TIMEOUT = "readTimeout";
     protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
@@ -436,6 +436,27 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Use older Bookkeeper wire protocol (no protobuf)
+     *
+     * @return whether or not to use older Bookkeeper wire protocol (no protobuf)
+     */
+    public boolean getUseV2WireProtocol() {
+        return getBoolean(USE_V2_WIRE_PROTOCOL, false);
+    }
+
+    /**
+     * Set whether or not to use older Bookkeeper wire protocol (no protobuf)
+     *
+     * @param useV2WireProtocol
+     *          whether or not to use older Bookkeeper wire protocol (no protobuf)
+     * @return client configuration.
+     */
+    public ClientConfiguration setUseV2WireProtocol(boolean useV2WireProtocol) {
+        setProperty(USE_V2_WIRE_PROTOCOL, useV2WireProtocol);
+        return this;
+    }
+
+    /**
      * Get zookeeper servers to connect
      *
      * @return zookeeper servers

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9001e300/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 75dced5..d2608e7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -322,6 +322,14 @@ class AuthHandler {
                     } else {
                         waitingForAuth.add(e);
                     }
+                } else if (e.getMessage() instanceof BookieProtocol.Request) {
+                    // let auth messages through, queue the rest
+                    BookieProtocol.Request req = (BookieProtocol.Request)e.getMessage();
+                    if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) {
+                        super.writeRequested(ctx, e);
+                    } else {
+                        waitingForAuth.add(e);
+                    }
                 } // else just drop
             }
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9001e300/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 2ce5ed8..1191d3c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -140,6 +140,9 @@ public interface BookieProtocol {
      * by the auth providers themselves.
      */
     public static final byte AUTH = 3;
+    public static final byte READ_LAC = 4;
+    public static final byte WRITE_LAC = 5;
+    public static final byte GET_BOOKIE_INFO = 6;
 
     /**
      * The error code that indicates success

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9001e300/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index f6e9e8f..a24bb1e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -21,8 +21,10 @@ import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
 import java.util.Collections;
+import java.util.Collection;
 import java.util.Queue;
 import java.util.Set;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -98,7 +100,7 @@ import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
 import java.net.SocketAddress;
-import java.util.Collection;
+
 import org.apache.bookkeeper.auth.BookKeeperPrincipal;
 import org.jboss.netty.channel.ChannelFactory;
 import org.apache.bookkeeper.client.ClientConnectionPeer;
@@ -147,6 +149,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     private final OpStatsLogger getBookieInfoOpLogger;
     private final OpStatsLogger getBookieInfoTimeoutOpLogger;
 
+    private final boolean useV2WireProtocol;
+
     /**
      * The following member variables do not need to be concurrent, or volatile
      * because they are always updated under a lock
@@ -202,6 +206,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         this.addEntryTimeout = conf.getAddEntryTimeout();
         this.readEntryTimeout = conf.getReadEntryTimeout();
         this.getBookieInfoTimeout = conf.getBookieInfoTimeout();
+        this.useV2WireProtocol = conf.getUseV2WireProtocol();
 
         this.authProviderFactory = authProviderFactory;
         this.extRegistry = extRegistry;
@@ -469,33 +474,45 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
                   Object ctx, final int options) {
-        final long txnId = getTxnId();
-        final int entrySize = toSend.readableBytes();
-        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.ADD_ENTRY);
-        completionObjects.put(completionKey,
-                new AddCompletion(this, addEntryOpLogger, cb, ctx, ledgerId, entryId,
-                                  scheduleTimeout(completionKey, addEntryTimeout)));
-
-        // Build the request and calculate the total size to be included in the packet.
-        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
-                .setVersion(ProtocolVersion.VERSION_THREE)
-                .setOperation(OperationType.ADD_ENTRY)
-                .setTxnId(txnId);
+        Object request = null;
+        CompletionKey completion = null;
+        if (useV2WireProtocol) {
+            completion = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
+            request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
+                    (short) options, masterKey, toSend);
 
-        AddRequest.Builder addBuilder = AddRequest.newBuilder()
-                .setLedgerId(ledgerId)
-                .setEntryId(entryId)
-                .setMasterKey(ByteString.copyFrom(masterKey))
-                .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
 
-        if (((short)options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
-            addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
+        } else {
+            final long txnId = getTxnId();
+            completion = new CompletionKey(txnId, OperationType.ADD_ENTRY);
+            // Build the request and calculate the total size to be included in the packet.
+            BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                    .setVersion(ProtocolVersion.VERSION_THREE)
+                    .setOperation(OperationType.ADD_ENTRY)
+                    .setTxnId(txnId);
+
+            AddRequest.Builder addBuilder = AddRequest.newBuilder()
+                    .setLedgerId(ledgerId)
+                    .setEntryId(entryId)
+                    .setMasterKey(ByteString.copyFrom(masterKey))
+                    .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
+
+            if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
+                addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
+            }
+            request = Request.newBuilder()
+                    .setHeader(headerBuilder)
+                    .setAddRequest(addBuilder)
+                    .build();
         }
 
-        final Request addRequest = Request.newBuilder()
-                .setHeader(headerBuilder)
-                .setAddRequest(addBuilder)
-                .build();
+        final Object addRequest = request;
+        final CompletionKey completionKey = completion;
+
+        completionObjects.put(completionKey, new AddCompletion(this,
+                addEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completion, addEntryTimeout)));
+
+        final int entrySize = toSend.readableBytes();
 
         final Channel c = channel;
         if (c == null) {
@@ -531,28 +548,44 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
                                         final long entryId,
                                         ReadEntryCallback cb, Object ctx) {
-        final long txnId = getTxnId();
-        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
-        completionObjects.put(completionKey,
-                new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
-                                   scheduleTimeout(completionKey, readEntryTimeout)));
-
-        // Build the request and calculate the total size to be included in the packet.
-        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
-                .setVersion(ProtocolVersion.VERSION_THREE)
-                .setOperation(OperationType.READ_ENTRY)
-                .setTxnId(txnId);
-
-        ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
-                .setLedgerId(ledgerId)
-                .setEntryId(entryId)
-                .setMasterKey(ByteString.copyFrom(masterKey))
-                .setFlag(ReadRequest.Flag.FENCE_LEDGER);
-
-        final Request readRequest = Request.newBuilder()
-                .setHeader(headerBuilder)
-                .setReadRequest(readBuilder)
-                .build();
+        Object request = null;
+        CompletionKey completion = null;
+        if (useV2WireProtocol) {
+            completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+            request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
+                    BookieProtocol.FLAG_DO_FENCING, masterKey);
+        } else {
+            final long txnId = getTxnId();
+            final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
+            completionObjects.put(completionKey,
+                    new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
+                            scheduleTimeout(completionKey, readEntryTimeout)));
+
+            // Build the request and calculate the total size to be included in the packet.
+            BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                    .setVersion(ProtocolVersion.VERSION_THREE)
+                    .setOperation(OperationType.READ_ENTRY)
+                    .setTxnId(txnId);
+
+            ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+                    .setLedgerId(ledgerId)
+                    .setEntryId(entryId)
+                    .setMasterKey(ByteString.copyFrom(masterKey))
+                    .setFlag(ReadRequest.Flag.FENCE_LEDGER);
+
+            final Request readRequest = Request.newBuilder()
+                    .setHeader(headerBuilder)
+                    .setReadRequest(readBuilder)
+                    .build();
+        }
+
+        final CompletionKey completionKey = completion;
+        if (completionObjects.putIfAbsent(completionKey, new ReadCompletion(this, readEntryOpLogger, cb,
+                ctx, ledgerId, entryId, scheduleTimeout(completionKey, readEntryTimeout))) != null) {
+            // We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
+            cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx);
+            return;
+        }
 
         final Channel c = channel;
         if (c == null) {
@@ -560,6 +593,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
             return;
         }
 
+        final Object readRequest = request;
         try {
             ChannelFuture future = c.write(readRequest);
             future.addListener(new ChannelFutureListener() {
@@ -586,22 +620,34 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
-        final long txnId = getTxnId();
-        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC);
+        Object request = null;
+        CompletionKey completion = null;
+        if (useV2WireProtocol) {
+            request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+                    ledgerId, (long) 0, (short) 0);
+            completion = new V2CompletionKey(ledgerId, (long) 0, OperationType.READ_LAC);
+        } else {
+            final long txnId = getTxnId();
+            final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC);
+
+            // Build the request and calculate the total size to be included in the packet.
+            BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                    .setVersion(ProtocolVersion.VERSION_THREE)
+                    .setOperation(OperationType.READ_LAC)
+                    .setTxnId(txnId);
+            ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
+                    .setLedgerId(ledgerId);
+            request = Request.newBuilder()
+                    .setHeader(headerBuilder)
+                    .setReadLacRequest(readLacBuilder)
+                    .build();
+        }
+        final Object readLacRequest = request;
+        final CompletionKey completionKey = completion;
+
         completionObjects.put(completionKey,
                 new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId,
                         scheduleTimeout(completionKey, readEntryTimeout)));
-        // Build the request and calculate the total size to be included in the packet.
-        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
-                .setVersion(ProtocolVersion.VERSION_THREE)
-                .setOperation(OperationType.READ_LAC)
-                .setTxnId(txnId);
-        ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
-                .setLedgerId(ledgerId);
-        final Request readLacRequest = Request.newBuilder()
-                .setHeader(headerBuilder)
-                .setReadLacRequest(readLacBuilder)
-                .build();
         final Channel c = channel;
         if (c == null) {
             errorOutReadLacKey(completionKey);
@@ -632,27 +678,37 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
-        final long txnId = getTxnId();
-        final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
-        completionObjects.put(completionKey,
-                new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
-                                   scheduleTimeout(completionKey, readEntryTimeout)));
+        Object request = null;
+        CompletionKey completion = null;
+        if (useV2WireProtocol) {
+            request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+                    ledgerId, entryId, (short) 0);
+            completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+        } else {
+            final long txnId = getTxnId();
+            completion = new CompletionKey(txnId, OperationType.READ_ENTRY);
 
-        // Build the request and calculate the total size to be included in the packet.
-        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
-                .setVersion(ProtocolVersion.VERSION_THREE)
-                .setOperation(OperationType.READ_ENTRY)
-                .setTxnId(txnId);
+            // Build the request and calculate the total size to be included in the packet.
+            BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+                    .setVersion(ProtocolVersion.VERSION_THREE)
+                    .setOperation(OperationType.READ_ENTRY)
+                    .setTxnId(txnId);
 
-        ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
-                .setLedgerId(ledgerId)
-                .setEntryId(entryId);
+            ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+                    .setLedgerId(ledgerId)
+                    .setEntryId(entryId);
 
-        final Request readRequest = Request.newBuilder()
-                .setHeader(headerBuilder)
-                .setReadRequest(readBuilder)
-                .build();
+            final Request readRequest = Request.newBuilder()
+                    .setHeader(headerBuilder)
+                    .setReadRequest(readBuilder)
+                    .build();
+        }
+        final Object readRequest = request;
+        final CompletionKey completionKey = completion;
 
+        completionObjects.put(completionKey,
+                new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
+                        scheduleTimeout(completionKey, readEntryTimeout)));
         final Channel c = channel;
         if (c == null) {
             errorOutReadKey(completionKey);
@@ -803,6 +859,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     void errorOutReadKey(final CompletionKey key, final int rc) {
+        LOG.info("Removing completion key: {}", key);
         final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key);
         if (null == readCompletion) {
             return;
@@ -835,6 +892,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     void errorOutWriteLacKey(final CompletionKey key, final int rc) {
+        LOG.info("Removing completion key: {}", key);
         final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key);
         if (null == writeLacCompletion) {
             return;
@@ -859,6 +917,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     }
 
     void errorOutReadLacKey(final CompletionKey key, final int rc) {
+        LOG.info("Removing completion key: {}", key);
         final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key);
         if (null == readLacCompletion) {
             return;
@@ -1058,12 +1117,101 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
      */
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-        if (!(e.getMessage() instanceof Response)) {
+
+        if (e.getMessage() instanceof BookieProtocol.Response) {
+            BookieProtocol.Response response = (BookieProtocol.Response) e.getMessage();
+            readV2Response(response);
+        } else if (e.getMessage() instanceof Response) {
+            Response response = (Response) e.getMessage();
+            readV3Response(response);
+        } else {
             ctx.sendUpstream(e);
-            return;
         }
+    }
+
+    private void readV2Response(final BookieProtocol.Response response) {
+        final long ledgerId = response.ledgerId;
+        final long entryId = response.entryId;
+
+        final OperationType operationType = getOperationType(response.getOpCode());
+        final StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
+
+        final CompletionValue completionValue = completionObjects.remove(new V2CompletionKey(ledgerId, entryId, operationType));
 
-        final Response response = (Response) e.getMessage();
+        if (null == completionValue) {
+            // Unexpected response, so log it. The txnId should have been present.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + operationType
+                        + " and ledger:entry : " + ledgerId + ":" + entryId);
+            }
+        } else {
+            long orderingKey = completionValue.ledgerId;
+
+            executor.submitOrdered(orderingKey, new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    switch (operationType) {
+                        case ADD_ENTRY: {
+                            handleAddResponse(ledgerId, entryId, status, completionValue);
+                        }
+                        case READ_ENTRY: {
+                            BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
+                            handleReadResponse(ledgerId, entryId, status, readResponse.getData(), completionValue);
+                        }
+                        default:
+                            LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", operationType, addr);
+                            break;
+                    }
+                }
+            });
+        }
+    }
+
+    private StatusCode getStatusCodeFromErrorCode(int errorCode) {
+        switch (errorCode) {
+            case BookieProtocol.EOK:
+                return StatusCode.EOK;
+            case BookieProtocol.ENOLEDGER:
+                return StatusCode.ENOLEDGER;
+            case BookieProtocol.ENOENTRY:
+                return StatusCode.ENOENTRY;
+            case BookieProtocol.EBADREQ:
+                return StatusCode.EBADREQ;
+            case BookieProtocol.EIO:
+                return StatusCode.EIO;
+            case BookieProtocol.EUA:
+                return StatusCode.EUA;
+            case BookieProtocol.EBADVERSION:
+                return StatusCode.EBADVERSION;
+            case BookieProtocol.EFENCED:
+                return StatusCode.EFENCED;
+            case BookieProtocol.EREADONLY:
+                return StatusCode.EREADONLY;
+            default:
+                throw new IllegalArgumentException("Invalid error code: " + errorCode);
+        }
+    }
+
+    private OperationType getOperationType(byte opCode) {
+        switch (opCode) {
+            case BookieProtocol.ADDENTRY:
+                return  OperationType.ADD_ENTRY;
+            case BookieProtocol.READENTRY:
+                return OperationType.READ_ENTRY;
+            case BookieProtocol.AUTH:
+                return OperationType.AUTH;
+            case BookieProtocol.READ_LAC:
+                return OperationType.READ_LAC;
+            case BookieProtocol.WRITE_LAC:
+                return OperationType.WRITE_LAC;
+            case BookieProtocol.GET_BOOKIE_INFO:
+                return OperationType.GET_BOOKIE_INFO;
+            default:
+                throw new IllegalArgumentException("Invalid operation type");
+        }
+    }
+
+    private void readV3Response(final Response response) {
         final BKPacketHeader header = response.getHeader();
 
         final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(),
@@ -1082,21 +1230,51 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                 public void safeRun() {
                     OperationType type = header.getOperation();
                     switch (type) {
-                        case ADD_ENTRY:
-                            handleAddResponse(response, completionValue);
+                        case ADD_ENTRY: {
+                            AddResponse addResponse = response.getAddResponse();
+                            StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
+                            handleAddResponse(addResponse.getLedgerId(), addResponse.getEntryId(), status, completionValue);
                             break;
-                        case READ_ENTRY:
-                            handleReadResponse(response, completionValue);
+                        }
+                        case READ_ENTRY: {
+                            ReadResponse readResponse = response.getReadResponse();
+                            StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
+                            ChannelBuffer buffer = ChannelBuffers.buffer(0);
+                            if (readResponse.hasBody()) {
+                                buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
+                            }
+                            handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, completionValue);
                             break;
-                        case WRITE_LAC:
-                            handleWriteLacResponse(response.getWriteLacResponse(), completionValue);
+                        }
+                        case WRITE_LAC: {
+                            WriteLacResponse writeLacResponse = response.getWriteLacResponse();
+                            StatusCode status = response.getStatus() == StatusCode.EOK ? writeLacResponse.getStatus() : response.getStatus();
+                            handleWriteLacResponse(writeLacResponse.getLedgerId(), status, completionValue);
                             break;
-                        case READ_LAC:
-                            handleReadLacResponse(response.getReadLacResponse(), completionValue);
+                        }
+                        case READ_LAC: {
+                            ReadLacResponse readLacResponse = response.getReadLacResponse();
+                            ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
+                            ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0);
+                            StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus();
+                            // Thread.dumpStack();
+
+                            if (readLacResponse.hasLacBody()) {
+                                lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+                            }
+
+                            if (readLacResponse.hasLastEntryBody()) {
+                                lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+                            }
+                            handleReadLacResponse(readLacResponse.getLedgerId(), status, lacBuffer, lastEntryBuffer, completionValue);
                             break;
-                        case GET_BOOKIE_INFO:
-                            handleGetBookieInfoResponse(response, completionValue);
+                        }
+                        case GET_BOOKIE_INFO: {
+                            GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
+                            StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
+                            handleGetBookieInfoResponse(getBookieInfoResponse.getFreeDiskSpace(), getBookieInfoResponse.getTotalDiskCapacity(), status, completionValue);
                             break;
+                        }
                         default:
                             LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
                                       type, addr);
@@ -1114,13 +1292,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         }
     }
 
-    void handleWriteLacResponse(WriteLacResponse writeLacResponse, CompletionValue completionValue) {
+    void handleWriteLacResponse(long ledgerId, StatusCode status, CompletionValue completionValue) {
         // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
         WriteLacCompletion plc = (WriteLacCompletion)completionValue;
 
-        long ledgerId = writeLacResponse.getLedgerId();
-        StatusCode status = writeLacResponse.getStatus();
-
         LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
 
         // convert to BKException code
@@ -1133,14 +1308,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx);
     }
 
- void handleAddResponse(Response response, CompletionValue completionValue) {
+ void handleAddResponse(long ledgerId, long entryId, StatusCode status, CompletionValue completionValue) {
         // The completion value should always be an instance of an AddCompletion object when we reach here.
         AddCompletion ac = (AddCompletion)completionValue;
-        AddResponse addResponse = response.getAddResponse();
-
-        long ledgerId = addResponse.getLedgerId();
-        long entryId = addResponse.getEntryId();
-        StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -1160,25 +1330,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
     }
 
-    void handleReadLacResponse(ReadLacResponse readLacResponse, CompletionValue completionValue) {
+    void handleReadLacResponse(long ledgerId, StatusCode status, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, CompletionValue completionValue) {
         // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
         ReadLacCompletion glac = (ReadLacCompletion)completionValue;
 
-        long ledgerId = readLacResponse.getLedgerId();
-        StatusCode status = readLacResponse.getStatus();
-        ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
-        ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0);
-
-       // Thread.dumpStack();
-
-        if (readLacResponse.hasLacBody()) {
-            lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
-        }
-
-        if (readLacResponse.hasLastEntryBody()) {
-            lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
-        }
-
         LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status);
         // convert to BKException code
         Integer rcToRet = statusCodeToExceptionCode(status);
@@ -1190,20 +1345,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx);
     }
 
-    void handleReadResponse(Response response, CompletionValue completionValue) {
+    void handleReadResponse(long ledgerId, long entryId, StatusCode status, ChannelBuffer buffer, CompletionValue completionValue) {
         // The completion value should always be an instance of a ReadCompletion object when we reach here.
         ReadCompletion rc = (ReadCompletion)completionValue;
-        ReadResponse readResponse = response.getReadResponse();
-
-        long ledgerId = readResponse.getLedgerId();
-        long entryId = readResponse.getEntryId();
-        StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
 
-        ChannelBuffer buffer = ChannelBuffers.buffer(0);
-
-        if (readResponse.hasBody()) {
-            buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
-        }
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -1222,15 +1367,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx);
     }
 
-    void handleGetBookieInfoResponse(Response response, CompletionValue completionValue) {
+    void handleGetBookieInfoResponse(long freeDiskSpace, long totalDiskCapacity,  StatusCode status, CompletionValue completionValue) {
         // The completion value should always be an instance of a GetBookieInfoCompletion object when we reach here.
         GetBookieInfoCompletion rc = (GetBookieInfoCompletion)completionValue;
-        GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
-
-        long freeDiskSpace = getBookieInfoResponse.hasFreeDiskSpace() ? getBookieInfoResponse.getFreeDiskSpace() : 0L;
-        long totalDiskCapacity = getBookieInfoResponse.hasTotalDiskCapacity() ? getBookieInfoResponse.getTotalDiskCapacity() : 0L;
-
-        StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Got response for read metadata request from bookie: {} rc {}", addr, rc);
@@ -1556,4 +1695,35 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         return txnIdGenerator.incrementAndGet();
     }
 
+    private class V2CompletionKey extends CompletionKey {
+        final long ledgerId;
+        final long entryId;
+
+
+        public V2CompletionKey(long ledgerId, long entryId, OperationType operationType) {
+            super(0, operationType);
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (!(object instanceof V2CompletionKey)) {
+                return  false;
+            }
+            V2CompletionKey that = (V2CompletionKey) object;
+            return  this.entryId == that.entryId && this.ledgerId == that.ledgerId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(ledgerId, entryId);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%d:%d %s", ledgerId, entryId, operationType);
+        }
+    }
 }