You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/10/10 09:28:54 UTC

[bookkeeper] branch master updated: ISSUE #525: Refactor PerChannelBookieClient

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 79f359f  ISSUE #525: Refactor PerChannelBookieClient
79f359f is described below

commit 79f359ff9b77cfce23d52d7a99715768497ee00f
Author: Ivan Kelly <iv...@ivankelly.net>
AuthorDate: Tue Oct 10 11:28:39 2017 +0200

    ISSUE #525: Refactor PerChannelBookieClient
    
    There's a lot of duplicate code in PerChannelBookieClient, particularly in the completions. It makes it more difficult to add anything to the class.
    
    Concretely the patch:
    - Factors out common code from PCBC completions
    - Makes erroring out a member of completion classes
    - Refactors out writing and flushing messages, so all RPCs use same code path
    - Moves timeout handling into CompletionValue
    - Moves response handling into completions
    - Moves logging and status conversion into a common method
    
    Author: Ivan Kelly <iv...@ivankelly.net>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Sijie Guo <si...@apache.org>
    
    This closes #526 from ivankelly/refactor-pcbc, closes #525
---
 .../bookkeeper/client/BookKeeperClientStats.java   |    1 +
 .../bookkeeper/proto/PerChannelBookieClient.java   | 1304 ++++++++------------
 2 files changed, 514 insertions(+), 791 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index a0389c1..091b920 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -63,4 +63,5 @@ public interface BookKeeperClientStats {
     public final static String CHANNEL_TIMEOUT_READ_LAC = "TIMEOUT_READ_LAC";
     public final static String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
     public final static String CHANNEL_START_TLS_OP = "START_TLS";
+    public final static String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 20f601c..dcc58f2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.proto;
 import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
 
 import com.google.common.collect.Sets;
+import com.google.common.base.Joiner;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
 import io.netty.bootstrap.Bootstrap;
@@ -170,6 +171,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     private final OpStatsLogger getBookieInfoOpLogger;
     private final OpStatsLogger getBookieInfoTimeoutOpLogger;
     private final OpStatsLogger startTLSOpLogger;
+    private final OpStatsLogger startTLSTimeoutOpLogger;
 
     private final boolean useV2WireProtocol;
 
@@ -269,6 +271,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
         getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
         startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
+        startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
 
         this.pcbcPool = pcbcPool;
 
@@ -477,10 +480,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteBuf toSend, WriteLacCallback cb,
             Object ctx) {
         final long txnId = getTxnId();
-        final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.WRITE_LAC);
+        final CompletionKey completionKey = new V3CompletionKey(txnId,
+                                                                OperationType.WRITE_LAC);
         // writeLac is mostly like addEntry hence uses addEntryTimeout
         completionObjects.put(completionKey,
-                new WriteLacCompletion(writeLacOpLogger, cb, ctx, lac, scheduleTimeout(completionKey, addEntryTimeout)));
+                              new WriteLacCompletion(completionKey, cb,
+                                                     ctx, lac));
 
         // Build the request
         BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -497,35 +502,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 .setHeader(headerBuilder)
                 .setWriteLacRequest(writeLacBuilder)
                 .build();
-
-        final Channel c = channel;
-        if (c == null) {
-            errorOutWriteLacKey(completionKey);
-            return;
-        }
-        try {
-            ChannelFuture future = c.writeAndFlush(writeLacRequest);
-            future.addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (future.isSuccess()) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Successfully wrote request for writeLac LedgerId: {} bookie: {}",
-                                    ledgerId, c.remoteAddress());
-                        }
-                    } else {
-                        if (!(future.cause() instanceof ClosedChannelException)) {
-                            LOG.warn("Writing Lac(lid={} to channel {} failed : ",
-                                    new Object[] { ledgerId, c, future.cause() });
-                        }
-                        errorOutWriteLacKey(completionKey);
-                    }
-                }
-            });
-        } catch (Throwable e) {
-            LOG.warn("writeLac operation failed", e);
-            errorOutWriteLacKey(completionKey);
-        }
+        writeAndFlush(channel, completionKey, writeLacRequest);
     }
 
     /**
@@ -550,14 +527,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf toSend, WriteCallback cb,
                   Object ctx, final int options) {
         Object request = null;
-        CompletionKey completion = null;
+        CompletionKey completionKey = null;
         if (useV2WireProtocol) {
-            completion = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
+            completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY);
             request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
                     (short) options, masterKey, toSend);
         } else {
             final long txnId = getTxnId();
-            completion = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
+            completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
             // Build the request and calculate the total size to be included in the packet.
             BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
                     .setVersion(ProtocolVersion.VERSION_THREE)
@@ -582,43 +559,18 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     .build();
         }
 
-        final Object addRequest = request;
-        final CompletionKey completionKey = completion;
-
-        completionObjects.put(completionKey, new AddCompletion(this,
-                addEntryOpLogger, cb, ctx, ledgerId, entryId, scheduleTimeout(completion, addEntryTimeout)));
-
-        final int entrySize = toSend.readableBytes();
-
+        completionObjects.put(completionKey,
+                              new AddCompletion(completionKey,
+                                                cb, ctx, ledgerId, entryId));
         final Channel c = channel;
         if (c == null) {
-            errorOutAddKey(completionKey);
+            // usually checked in writeAndFlush, but we have extra check
+            // because we need to release toSend.
+            errorOut(completionKey);
             toSend.release();
             return;
-        }
-        try {
-            ChannelFuture future = c.writeAndFlush(addRequest);
-            future.addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (future.isSuccess()) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId
-                                                            + " bookie: " + c.remoteAddress() + " entry length: " + entrySize);
-                        }
-                        // totalBytesOutstanding.addAndGet(entrySize);
-                    } else {
-                        if (!(future.cause() instanceof ClosedChannelException)) {
-                            LOG.warn("Writing addEntry(lid={}, eid={}) to channel {} failed : ",
-                                    new Object[] { ledgerId, entryId, c, future.cause() });
-                        }
-                        errorOutAddKey(completionKey);
-                    }
-                }
-            });
-        } catch (Throwable e) {
-            LOG.warn("Add entry operation failed", e);
-            errorOutAddKey(completionKey);
+        } else {
+            writeAndFlush(c, completionKey, request);
         }
     }
 
@@ -626,14 +578,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                                         final long entryId,
                                         ReadEntryCallback cb, Object ctx) {
         Object request = null;
-        CompletionKey completion = null;
+        CompletionKey completionKey = null;
         if (useV2WireProtocol) {
-            completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+            completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
             request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
                     BookieProtocol.FLAG_DO_FENCING, masterKey);
         } else {
             final long txnId = getTxnId();
-            completion = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
+            completionKey = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
 
             // Build the request and calculate the total size to be included in the packet.
             BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -653,56 +605,28 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     .build();
         }
 
-        final CompletionKey completionKey = completion;
-        if (completionObjects.putIfAbsent(completionKey, new ReadCompletion(this, readEntryOpLogger, cb,
-                ctx, ledgerId, entryId, scheduleTimeout(completionKey, readEntryTimeout))) != null) {
+        if (completionObjects.putIfAbsent(
+                    completionKey, new ReadCompletion(completionKey,
+                                                      cb, ctx,
+                                                      ledgerId, entryId)) != null) {
             // We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
             cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException, ledgerId, entryId, null, ctx);
             return;
         }
 
-        final Channel c = channel;
-        if (c == null) {
-            errorOutReadKey(completionKey);
-            return;
-        }
-
-        final Object readRequest = request;
-        try {
-            ChannelFuture future = c.writeAndFlush(readRequest);
-            future.addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
-                        if (future.isSuccess()) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Successfully wrote request {} to {}",
-                                          readRequest, c.remoteAddress());
-                            }
-                        } else {
-                            if (!(future.cause() instanceof ClosedChannelException)) {
-                                LOG.warn("Writing readEntryAndFenceLedger(lid={}, eid={}) to channel {} failed : ",
-                                        new Object[] { ledgerId, entryId, c, future.cause() });
-                            }
-                            errorOutReadKey(completionKey);
-                        }
-                    }
-                });
-        } catch(Throwable e) {
-            LOG.warn("Read entry operation {} failed", completionKey, e);
-            errorOutReadKey(completionKey);
-        }
+        writeAndFlush(channel, completionKey, request);
     }
 
     public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
         Object request = null;
-        CompletionKey completion = null;
+        CompletionKey completionKey = null;
         if (useV2WireProtocol) {
             request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
                     ledgerId, 0, (short) 0);
-            completion = new V2CompletionKey(ledgerId, 0, OperationType.READ_LAC);
+            completionKey = new V2CompletionKey(ledgerId, 0, OperationType.READ_LAC);
         } else {
             final long txnId = getTxnId();
-            completion = new V3CompletionKey(txnId, OperationType.READ_LAC);
+            completionKey = new V3CompletionKey(txnId, OperationType.READ_LAC);
 
             // Build the request and calculate the total size to be included in the packet.
             BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -716,40 +640,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     .setReadLacRequest(readLacBuilder)
                     .build();
         }
-        final Object readLacRequest = request;
-        final CompletionKey completionKey = completion;
-
         completionObjects.put(completionKey,
-                new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId,
-                        scheduleTimeout(completionKey, readEntryTimeout)));
-        final Channel c = channel;
-        if (c == null) {
-            errorOutReadLacKey(completionKey);
-            return;
-        }
-
-        try {
-            ChannelFuture future = c.writeAndFlush(readLacRequest);
-            future.addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (future.isSuccess()) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Succssfully wrote request {} to {}", readLacRequest, c.remoteAddress());
-                        }
-                    } else {
-                        if (!(future.cause() instanceof ClosedChannelException)) {
-                            LOG.warn("Writing readLac(lid = {}) to channel {} failed : ",
-                                    new Object[] { ledgerId, c, future.cause() });
-                        }
-                        errorOutReadLacKey(completionKey);
-                    }
-                }
-            });
-        } catch(Throwable e) {
-            LOG.warn("Read LAC operation {} failed", readLacRequest, e);
-            errorOutReadLacKey(completionKey);
-        }
+                              new ReadLacCompletion(completionKey, cb,
+                                                    ctx, ledgerId));
+        writeAndFlush(channel, completionKey, request);
     }
 
     /**
@@ -783,14 +677,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                                    final ReadEntryCallback cb,
                                    final Object ctx) {
         Object request = null;
-        CompletionKey completion = null;
+        CompletionKey completionKey = null;
         if (useV2WireProtocol) {
             request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
                     ledgerId, entryId, (short) 0);
-            completion = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
+            completionKey = new V2CompletionKey(ledgerId, entryId, OperationType.READ_ENTRY);
         } else {
             final long txnId = getTxnId();
-            completion = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
+            completionKey = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
 
             // Build the request and calculate the total size to be included in the packet.
             BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -831,49 +725,19 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     .setReadRequest(readBuilder)
                     .build();
         }
-        final Object readRequest = request;
-        final CompletionKey completionKey = completion;
 
         completionObjects.put(completionKey,
-                new ReadCompletion(this, readEntryOpLogger, cb, ctx, ledgerId, entryId,
-                        scheduleTimeout(completionKey, readEntryTimeout)));
-        final Channel c = channel;
-        if (c == null) {
-            errorOutReadKey(completionKey);
-            return;
-        }
-
-        try{
-            ChannelFuture future = c.writeAndFlush(readRequest);
-            future.addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (future.isSuccess()) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Successfully wrote request {} to {}",
-                                      readRequest, c.remoteAddress());
-                        }
-                    } else {
-                        if (!(future.cause() instanceof ClosedChannelException)) {
-                            LOG.warn("Writing readEntry(lid={}, eid={}) to channel {} failed : ",
-                                    new Object[] { ledgerId, entryId, c, future.cause() });
-                        }
-                        errorOutReadKey(completionKey);
-                    }
-                }
-            });
-        } catch(Throwable e) {
-            LOG.warn("Read entry operation {} failed", readRequest, e);
-            errorOutReadKey(completionKey);
-        }
+                              new ReadCompletion(completionKey, cb,
+                                                 ctx, ledgerId, entryId));
+        writeAndFlush(channel, completionKey, request);
     }
 
     public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) {
         final long txnId = getTxnId();
         final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.GET_BOOKIE_INFO);
         completionObjects.put(completionKey,
-                new GetBookieInfoCompletion(this, getBookieInfoOpLogger, cb, ctx,
-                                   scheduleTimeout(completionKey, getBookieInfoTimeout)));
+                              new GetBookieInfoCompletion(
+                                      completionKey, cb, ctx));
 
         // Build the request and calculate the total size to be included in the packet.
         BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
@@ -889,35 +753,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                 .setGetBookieInfoRequest(getBookieInfoBuilder)
                 .build();
 
-        final Channel c = channel;
-        if (c == null) {
-            errorOutGetBookieInfoKey(completionKey);
-            return;
-        }
-
-        try{
-            ChannelFuture future = c.writeAndFlush(getBookieInfoRequest);
-            future.addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (future.isSuccess()) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Successfully wrote request {} to {}",
-                                    getBookieInfoRequest, c.remoteAddress());
-                        }
-                    } else {
-                        if (!(future.cause() instanceof ClosedChannelException)) {
-                            LOG.warn("Writing GetBookieInfoRequest(flags={}) to channel {} failed : ",
-                                    new Object[] { requested, c, future.cause() });
-                        }
-                        errorOutGetBookieInfoKey(completionKey);
-                    }
-                }
-            });
-        } catch(Throwable e) {
-            LOG.warn("Get metadata operation {} failed", getBookieInfoRequest, e);
-            errorOutGetBookieInfoKey(completionKey);
-        }
+        writeAndFlush(channel, completionKey, getBookieInfoRequest);
     }
 
     /**
@@ -980,163 +816,72 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         return c.close();
     }
 
-    void errorStartTLS(int rc) {
-        failTLS(rc);
-    }
-
-    void errorOutReadKey(final CompletionKey key) {
-        errorOutReadKey(key, BKException.Code.BookieHandleNotAvailableException);
-    }
-
-    void errorOutReadKey(final CompletionKey key, final int rc) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Removing completion key: {}", key);
-        }
-        final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key);
-        if (null == readCompletion) {
+    private void writeAndFlush(final Channel channel,
+                               final CompletionKey key,
+                               final Object request) {
+        if (channel == null) {
+            errorOut(key);
             return;
         }
-        executor.submitOrdered(readCompletion.ledgerId, new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                String bAddress = "null";
-                Channel c = channel;
-                if (c != null && c.remoteAddress() != null) {
-                    bAddress = c.remoteAddress().toString();
-                }
 
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {} rc: {}",
-                            new Object[] { readCompletion.entryId, readCompletion.ledgerId, bAddress, rc });
-                }
-
-                readCompletion.cb.readEntryComplete(rc, readCompletion.ledgerId, readCompletion.entryId,
-                                                    null, readCompletion.ctx);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("ErrorOutReadKey(%s)", key);
-            }
-        });
+        try{
+            channel.writeAndFlush(request)
+                .addListener(new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future)
+                                throws Exception {
+                            if (future.isSuccess()) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Successfully wrote request {} to {}",
+                                              requestToString(request),
+                                              channel.remoteAddress());
+                                }
+                            } else {
+                                if (!(future.cause()
+                                      instanceof ClosedChannelException)) {
+                                    LOG.warn("Writing request {} to {} failed : ",
+                                             requestToString(request),
+                                             channel, future.cause());
+                                }
+                                errorOut(key);
+                            }
+                        }
+                    });
+        } catch(Throwable e) {
+            LOG.warn("Operation {} failed", requestToString(request), e);
+            errorOut(key);
+        }
     }
 
-    void errorOutWriteLacKey(final CompletionKey key) {
-        errorOutWriteLacKey(key, BKException.Code.BookieHandleNotAvailableException);
+    private static String requestToString(Object request) {
+        if (request instanceof BookkeeperProtocol.Request) {
+            BookkeeperProtocol.BKPacketHeader header
+                = ((BookkeeperProtocol.Request)request).getHeader();
+            return String.format("Req(txnId=%d,op=%s,version=%s)",
+                                 header.getTxnId(), header.getOperation(),
+                                 header.getVersion());
+        } else {
+            return request.toString();
+        }
     }
-
-    void errorOutWriteLacKey(final CompletionKey key, final int rc) {
+    void errorOut(final CompletionKey key) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Removing completion key: {}", key);
         }
-        final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key);
-        if (null == writeLacCompletion) {
-            return;
+        CompletionValue completion = completionObjects.remove(key);
+        if (completion != null) {
+            completion.errorOut();
         }
-        executor.submitOrdered(writeLacCompletion.ledgerId, new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                String bAddress = "null";
-                Channel c = channel;
-                if (c != null) {
-                    bAddress = c.remoteAddress().toString();
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Could not write request writeLac for ledgerId: {} bookie: {}",
-                            writeLacCompletion.ledgerId, bAddress);
-                }
-                writeLacCompletion.cb.writeLacComplete(rc, writeLacCompletion.ledgerId, addr, writeLacCompletion.ctx);
-            }
-        });
     }
 
-    void errorOutReadLacKey(final CompletionKey key) {
-        errorOutReadLacKey(key, BKException.Code.BookieHandleNotAvailableException);
-    }
-
-    void errorOutReadLacKey(final CompletionKey key, final int rc) {
+    void errorOut(final CompletionKey key, final int rc) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Removing completion key: {}", key);
         }
-        final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key);
-        if (null == readLacCompletion) {
-            return;
-        }
-        executor.submitOrdered(readLacCompletion.ledgerId, new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                String bAddress = "null";
-                Channel c = channel;
-                if (c != null) {
-                    bAddress = c.remoteAddress().toString();
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Could not write request readLac for ledgerId: {} bookie: {}", readLacCompletion.ledgerId,
-                            bAddress);
-                }
-                readLacCompletion.cb.readLacComplete(rc, readLacCompletion.ledgerId, null, null, readLacCompletion.ctx);
-            }
-        });
-    }
-
-    void errorOutAddKey(final CompletionKey key) {
-        errorOutAddKey(key, BKException.Code.BookieHandleNotAvailableException);
-    }
-
-    void errorOutAddKey(final CompletionKey key, final int rc) {
-        final AddCompletion addCompletion = (AddCompletion)completionObjects.remove(key);
-        if (null == addCompletion) {
-            return;
-        }
-        executor.submitOrdered(addCompletion.ledgerId, new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                String bAddress = "null";
-                Channel c = channel;
-                if (c != null && c.remoteAddress() != null) {
-                    bAddress = c.remoteAddress().toString();
-                }
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {} rc: {}",
-                            new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress, rc });
-                }
-
-                addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, addCompletion.entryId,
-                                               addr, addCompletion.ctx);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Invoked callback method: {}", addCompletion.entryId);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("ErrorOutAddKey(%s)", key);
-            }
-        });
-    }
-
-    void errorOutGetBookieInfoKey(final CompletionKey key) {
-        errorOutGetBookieInfoKey(key, BKException.Code.BookieHandleNotAvailableException);
-    }
-
-    void errorOutGetBookieInfoKey(final CompletionKey key, final int rc) {
-        final GetBookieInfoCompletion getBookieInfoCompletion = (GetBookieInfoCompletion)completionObjects.remove(key);
-        if (null == getBookieInfoCompletion) {
-            return;
+        CompletionValue completion = completionObjects.remove(key);
+        if (completion != null) {
+            completion.errorOut(rc);
         }
-        executor.submit(new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                String bAddress = "null";
-                Channel c = channel;
-                if (c != null) {
-                    bAddress = c.remoteAddress().toString();
-                }
-                LOG.debug("Could not write getBookieInfo request for bookie: {}", new Object[] {bAddress});
-                getBookieInfoCompletion.cb.getBookieInfoComplete(rc, new BookieInfo(), getBookieInfoCompletion.ctx);
-            }
-        });
     }
 
     /**
@@ -1155,22 +900,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         // successfully removes the key from the map is the one responsible for
         // calling the application callback.
         for (CompletionKey key : completionObjects.keySet()) {
-            switch (key.operationType) {
-                case ADD_ENTRY:
-                    errorOutAddKey(key, rc);
-                    break;
-                case READ_ENTRY:
-                    errorOutReadKey(key, rc);
-                    break;
-                case GET_BOOKIE_INFO:
-                    errorOutGetBookieInfoKey(key, rc);
-                    break;
-                case START_TLS:
-                    errorStartTLS(rc);
-                    break;
-                default:
-                    break;
-            }
+            errorOut(key, rc);
         }
     }
 
@@ -1261,7 +991,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             Response response = (Response) msg;
             readV3Response(response);
         } else {
-        	ctx.fireChannelRead(msg);
+            ctx.fireChannelRead(msg);
         }
     }
 
@@ -1284,28 +1014,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             long orderingKey = completionValue.ledgerId;
 
             executor.submitOrdered(orderingKey, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    switch (operationType) {
-                        case ADD_ENTRY: {
-                            handleAddResponse(ledgerId, entryId, status, completionValue);
-                            break;
-                        }
-                        case READ_ENTRY: {
-                            BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
-                            ByteBuf data = null;
-                            if (readResponse.hasData()) {
-                              data = readResponse.getData();
-                            }
-                            handleReadResponse(ledgerId, entryId, status, data, INVALID_ENTRY_ID, -1L, completionValue);
-                            break;
-                        }
-                        default:
-                            LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", operationType, addr);
-                            break;
+                    @Override
+                    public void safeRun() {
+                        completionValue.handleV2Response(ledgerId, entryId,
+                                                         status, response);
                     }
-                }
-            });
+                });
         }
     }
 
@@ -1370,71 +1084,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             executor.submitOrdered(orderingKey, new SafeRunnable() {
                 @Override
                 public void safeRun() {
-                    OperationType type = header.getOperation();
-                    switch (type) {
-                        case ADD_ENTRY: {
-                            AddResponse addResponse = response.getAddResponse();
-                            StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
-                            handleAddResponse(addResponse.getLedgerId(), addResponse.getEntryId(), status, completionValue);
-                            break;
-                        }
-                        case READ_ENTRY: {
-                            ReadResponse readResponse = response.getReadResponse();
-                            StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
-                            ByteBuf buffer = Unpooled.EMPTY_BUFFER;
-                            if (readResponse.hasBody()) {
-                                buffer = Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
-                            }
-                            long maxLAC = INVALID_ENTRY_ID;
-                            if (readResponse.hasMaxLAC()) {
-                                maxLAC = readResponse.getMaxLAC();
-                            }
-                            long lacUpdateTimestamp = -1L;
-                            if (readResponse.hasLacUpdateTimestamp()) {
-                                lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
-                            }
-                            handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, maxLAC, lacUpdateTimestamp, completionValue);
-                            break;
-                        }
-                        case WRITE_LAC: {
-                            WriteLacResponse writeLacResponse = response.getWriteLacResponse();
-                            StatusCode status = response.getStatus() == StatusCode.EOK ? writeLacResponse.getStatus() : response.getStatus();
-                            handleWriteLacResponse(writeLacResponse.getLedgerId(), status, completionValue);
-                            break;
-                        }
-                        case READ_LAC: {
-                            ReadLacResponse readLacResponse = response.getReadLacResponse();
-                            ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
-                            ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
-                            StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus();
-                            // Thread.dumpStack();
-
-                            if (readLacResponse.hasLacBody()) {
-                                lacBuffer = Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
-                            }
-
-                            if (readLacResponse.hasLastEntryBody()) {
-                                lastEntryBuffer = Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
-                            }
-                            handleReadLacResponse(readLacResponse.getLedgerId(), status, lacBuffer, lastEntryBuffer, completionValue);
-                            break;
-                        }
-                        case GET_BOOKIE_INFO: {
-                            GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
-                            StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
-                            handleGetBookieInfoResponse(getBookieInfoResponse.getFreeDiskSpace(), getBookieInfoResponse.getTotalDiskCapacity(), status, completionValue);
-                            break;
-                        }
-                        case START_TLS: {
-                            StatusCode status = response.getStatus();
-                            handleStartTLSResponse(status, completionValue);
-                            break;
-                        }
-                        default:
-                            LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
-                                      type, addr);
-                            break;
-                    }
+                    completionValue.handleV3Response(response);
                 }
 
                 @Override
@@ -1447,37 +1097,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
     }
 
-    void handleStartTLSResponse(StatusCode status, CompletionValue completionValue) {
-        StartTLSCompletion tlsCompletion = (StartTLSCompletion) completionValue;
-
-        // convert to BKException code because thats what the upper
-        // layers expect. This is UGLY, there should just be one set of
-        // error codes.
-        Integer rcToRet = statusCodeToExceptionCode(status);
-        if (null == rcToRet) {
-            LOG.error("START_TLS failed on bookie:{}", addr);
-            rcToRet = BKException.Code.SecurityException;
-        } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Received START_TLS response from {} rc: {}", addr, rcToRet);
-            }
-        }
-
-        // Cancel START_TLS request timeout
-        tlsCompletion.cb.startTLSComplete(rcToRet, tlsCompletion.ctx);
-
-        if (state != ConnectionState.START_TLS) {
-            LOG.error("Connection state changed before TLS response received");
-            failTLS(BKException.Code.BookieHandleNotAvailableException);
-        } else if (status != StatusCode.EOK) {
-            LOG.error("Client received error {} during TLS negotiation", status);
-            failTLS(BKException.Code.SecurityException);
-        } else {
-            // create TLS handler
-            PerChannelBookieClient parentObj = PerChannelBookieClient.this;
-            SslHandler handler = parentObj.shFactory.newTLSHandler();
-            channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
-            handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+    void initTLSHandshake() {
+        // create TLS handler
+        PerChannelBookieClient parentObj = PerChannelBookieClient.this;
+        SslHandler handler = parentObj.shFactory.newTLSHandler();
+        channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
+        handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
                 @Override
                 public void operationComplete(Future<Channel> future) throws Exception {
                     int rc;
@@ -1536,361 +1161,487 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     }
                 }
             });
-        }
     }
 
-    void handleWriteLacResponse(long ledgerId, StatusCode status, CompletionValue completionValue) {
-        // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
-        WriteLacCompletion plc = (WriteLacCompletion)completionValue;
+    /**
+     * Boiler-plate wrapper classes follow
+     *
+     */
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: "
-                    + status);
+    // visible for testing
+    abstract class CompletionValue {
+        final Object ctx;
+        protected final long ledgerId;
+        protected final long entryId;
+        private final long startTime;
+        private final OpStatsLogger opLogger;
+        private final OpStatsLogger timeoutOpLogger;
+        protected final Timeout timeout;
+        private final String operationName;
+
+        public CompletionValue(String operationName,
+                               Object ctx,
+                               long ledgerId, long entryId,
+                               OpStatsLogger opLogger,
+                               OpStatsLogger timeoutOpLogger,
+                               Timeout timeout) {
+            this.operationName = operationName;
+            this.ctx = ctx;
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.startTime = MathUtils.nowInNano();
+            this.opLogger = opLogger;
+            this.timeoutOpLogger = timeoutOpLogger;
+            this.timeout = timeout;
         }
 
-        // convert to BKException code
-        Integer rcToRet = statusCodeToExceptionCode(status);
-        if (null == rcToRet) {
-            LOG.error("writeLac for ledger: " + ledgerId + " failed on bookie: " + addr
-                        + " with code:" + status);
-            rcToRet = BKException.Code.WriteException;
+        private long latency() {
+            return MathUtils.elapsedNanos(startTime);
         }
-        plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx);
-    }
 
- void handleAddResponse(long ledgerId, long entryId, StatusCode status, CompletionValue completionValue) {
-        // The completion value should always be an instance of an AddCompletion object when we reach here.
-        AddCompletion ac = (AddCompletion)completionValue;
+        void cancelTimeoutAndLogOp(int rc) {
+            if (null != timeout) {
+                timeout.cancel();
+            }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
-                    + entryId + " rc: " + status);
-        }
-        // convert to BKException code because thats what the upper
-        // layers expect. This is UGLY, there should just be one set of
-        // error codes.
-        Integer rcToRet = statusCodeToExceptionCode(status);
-        if (null == rcToRet) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
-                        + " with code:" + status);
+            if (rc != BKException.Code.OK) {
+                opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
+            } else {
+                opLogger.registerSuccessfulEvent(latency(), TimeUnit.NANOSECONDS);
             }
-            rcToRet = BKException.Code.WriteException;
-        }
-        ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
-    }
 
-    void handleReadLacResponse(long ledgerId, StatusCode status, ByteBuf lacBuffer, ByteBuf lastEntryBuffer, CompletionValue completionValue) {
-        // The completion value should always be an instance of an WriteLacCompletion object when we reach here.
-        ReadLacCompletion glac = (ReadLacCompletion)completionValue;
+            if (rc != BKException.Code.OK
+                && !expectedBkOperationErrors.contains(rc)) {
+                recordError();
+            }
+        }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: "
-                    + status);
+        void timeout() {
+            errorOut(BKException.Code.TimeoutException);
+            timeoutOpLogger.registerSuccessfulEvent(latency(),
+                                                    TimeUnit.NANOSECONDS);
         }
-        // convert to BKException code
-        Integer rcToRet = statusCodeToExceptionCode(status);
-        if (null == rcToRet) {
+
+        protected int logAndConvertStatus(StatusCode status, int defaultStatus,
+                                          Object... extraInfo) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("readLac for ledger: " + ledgerId + " failed on bookie: " + addr + " with code:" + status);
+                LOG.debug("Got {} response from bookie:{} rc:{}, {}",
+                          operationName, addr, status,
+                          Joiner.on(":").join(extraInfo));
             }
-            rcToRet = BKException.Code.ReadException;
-        }
-        glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx);
-    }
-
-    void handleReadResponse(long ledgerId,
-                            long entryId,
-                            StatusCode status,
-                            ByteBuf buffer,
-                            long maxLAC, // max known lac piggy-back from bookies
-                            long lacUpdateTimestamp, // the timestamp when the lac is updated.
-                            CompletionValue completionValue) {
-        // The completion value should always be an instance of a ReadCompletion object when we reach here.
-        ReadCompletion rc = (ReadCompletion)completionValue;
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
-                    + entryId + " rc: " + rc + " entry length: " + buffer.readableBytes());
+            // convert to BKException code
+            Integer rcToRet = statusCodeToExceptionCode(status);
+            if (null == rcToRet) {
+                LOG.error("{} for failed on bookie {} code {}",
+                          operationName, addr, status);
+                return defaultStatus;
+            } else {
+                return rcToRet;
+            }
         }
 
-        // convert to BKException code because thats what the uppper
-        // layers expect. This is UGLY, there should just be one set of
-        // error codes.
-        Integer rcToRet = statusCodeToExceptionCode(status);
-        if (null == rcToRet) {
-            LOG.error("Read entry for ledger:{}, entry:{} failed on bookie:{} with code:{}",
-                      new Object[] { ledgerId, entryId, addr, status });
-            rcToRet = BKException.Code.ReadException;
-        }
-        if(buffer != null) {
-            buffer = buffer.slice();
-        }
-        if (maxLAC > INVALID_ENTRY_ID && (rc.ctx instanceof ReadEntryCallbackCtx)) {
-            ((ReadEntryCallbackCtx) rc.ctx).setLastAddConfirmed(maxLAC);
-        }
-        if (lacUpdateTimestamp > -1L && (rc.ctx instanceof ReadLastConfirmedAndEntryContext)) {
-            ((ReadLastConfirmedAndEntryContext) rc.ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
-        }
-        rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer, rc.ctx);
-    }
 
-    void handleGetBookieInfoResponse(long freeDiskSpace, long totalDiskCapacity,  StatusCode status, CompletionValue completionValue) {
-        // The completion value should always be an instance of a GetBookieInfoCompletion object when we reach here.
-        GetBookieInfoCompletion rc = (GetBookieInfoCompletion)completionValue;
+        public abstract void errorOut();
+        public abstract void errorOut(final int rc);
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Got response for read metadata request from bookie: {} rc {}", addr, rc);
+        protected void errorOutAndRunCallback(final Runnable callback) {
+            executor.submitOrdered(ledgerId,
+                    new SafeRunnable() {
+                        @Override
+                        public void safeRun() {
+                            String bAddress = "null";
+                            Channel c = channel;
+                            if (c != null && c.remoteAddress() != null) {
+                                bAddress = c.remoteAddress().toString();
+                            }
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Could not write {} request writeLac to bookie {} for ledger {}, entry {}",
+                                          operationName, bAddress,
+                                          ledgerId, entryId);
+                            }
+                            callback.run();
+                        }
+                    });
         }
 
-        // convert to BKException code because thats what the upper
-        // layers expect. This is UGLY, there should just be one set of
-        // error codes.
-        Integer rcToRet = statusCodeToExceptionCode(status);
-        if (null == rcToRet) {
-            LOG.error("Read metadata failed on bookie:{} with code:{}",
-                      new Object[] { addr, status });
-            rcToRet = BKException.Code.ReadException;
+        public void handleV2Response(
+                long ledgerId, long entryId, StatusCode status,
+                BookieProtocol.Response response) {
+            LOG.warn("Unhandled V2 response {}", response);
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Response received from bookie info read: freeDiskSpace=" + freeDiskSpace + " totalDiskSpace:"
-                    + totalDiskCapacity);
-        }
-        rc.cb.getBookieInfoComplete(rcToRet, new BookieInfo(totalDiskCapacity, freeDiskSpace), rc.ctx);
+        public abstract void handleV3Response(
+                BookkeeperProtocol.Response response);
     }
 
-    /**
-     * Boiler-plate wrapper classes follow
-     *
-     */
-
     // visible for testing
-    static abstract class CompletionValue {
-        final Object ctx;
-        protected final long ledgerId;
-        protected final long entryId;
-        protected final Timeout timeout;
+    class WriteLacCompletion extends CompletionValue {
+        final WriteLacCallback cb;
 
-        public CompletionValue(Object ctx, long ledgerId, long entryId,
-                               Timeout timeout) {
-            this.ctx = ctx;
-            this.ledgerId = ledgerId;
-            this.entryId = entryId;
-            this.timeout = timeout;
+        public WriteLacCompletion(CompletionKey key,
+                                  final WriteLacCallback originalCallback,
+                                  final Object originalCtx,
+                                  final long ledgerId) {
+            super("WriteLAC",
+                  originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+                  writeLacOpLogger, writeLacTimeoutOpLogger,
+                  scheduleTimeout(key, addEntryTimeout));
+            this.cb = new WriteLacCallback() {
+                    @Override
+                    public void writeLacComplete(int rc, long ledgerId,
+                                                 BookieSocketAddress addr,
+                                                 Object ctx) {
+                        cancelTimeoutAndLogOp(rc);
+                        originalCallback.writeLacComplete(rc, ledgerId,
+                                                          addr, originalCtx);
+                    }
+                };
         }
 
-        void cancelTimeout() {
-            if (null != timeout) {
-                timeout.cancel();
-            }
+        @Override
+        public void errorOut() {
+            errorOut(BKException.Code.BookieHandleNotAvailableException);
         }
-    }
 
-    // visible for testing
-    static class WriteLacCompletion extends CompletionValue {
-        final WriteLacCallback cb;
-
-        public WriteLacCompletion(WriteLacCallback cb, Object ctx, long ledgerId) {
-            this(null, cb, ctx, ledgerId, null);
+        @Override
+        public void errorOut(final int rc) {
+            errorOutAndRunCallback(
+                    () -> cb.writeLacComplete(rc, ledgerId, addr, ctx));
         }
 
-        public WriteLacCompletion(final OpStatsLogger writeLacOpLogger, final WriteLacCallback originalCallback,
-                final Object originalCtx, final long ledgerId, final Timeout timeout) {
-            super(originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout);
-            final long startTime = MathUtils.nowInNano();
-            this.cb = null == writeLacOpLogger ? originalCallback : new WriteLacCallback() {
-                @Override
-                public void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) {
-                    cancelTimeout();
-                    long latency = MathUtils.elapsedNanos(startTime);
-                    if (rc != BKException.Code.OK) {
-                        writeLacOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
-                    } else {
-                        writeLacOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
-                    }
-                    originalCallback.writeLacComplete(rc, ledgerId, addr, originalCtx);
-                }
-            };
+        @Override
+        public void handleV3Response(BookkeeperProtocol.Response response) {
+            WriteLacResponse writeLacResponse = response.getWriteLacResponse();
+            StatusCode status = response.getStatus() == StatusCode.EOK ?
+                writeLacResponse.getStatus() : response.getStatus();
+            long ledgerId = writeLacResponse.getLedgerId();
 
+            int rc = logAndConvertStatus(status,
+                                         BKException.Code.WriteException,
+                                         "ledger", ledgerId);
+            cb.writeLacComplete(rc, ledgerId, addr, ctx);
         }
     }
 
     // visible for testing
-    static class ReadLacCompletion extends CompletionValue {
+    class ReadLacCompletion extends CompletionValue {
         final ReadLacCallback cb;
 
-        public ReadLacCompletion(ReadLacCallback cb, Object ctx, long ledgerId) {
-            this (null, cb, ctx, ledgerId, null);
+        public ReadLacCompletion(CompletionKey key,
+                                 ReadLacCallback originalCallback,
+                                 final Object ctx, final long ledgerId) {
+            super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
+                  readLacOpLogger, readLacTimeoutOpLogger,
+                  scheduleTimeout(key, readEntryTimeout));
+            this.cb = new ReadLacCallback() {
+                    @Override
+                    public void readLacComplete(int rc, long ledgerId,
+                                                ByteBuf lacBuffer,
+                                                ByteBuf lastEntryBuffer,
+                                                Object ctx) {
+                        cancelTimeoutAndLogOp(rc);
+                        originalCallback.readLacComplete(
+                                rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
+                    }
+                };
         }
 
-        public ReadLacCompletion(final OpStatsLogger readLacOpLogger, final ReadLacCallback originalCallback,
-                final Object ctx, final long ledgerId, final Timeout timeout) {
-            super(ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout);
-            final long startTime = MathUtils.nowInNano();
-            this.cb = null == readLacOpLogger ? originalCallback : new ReadLacCallback() {
-                @Override
-                public void readLacComplete(int rc, long ledgerId, ByteBuf lacBuffer, ByteBuf lastEntryBuffer,
-                        Object ctx) {
-                    cancelTimeout();
-                    long latency = MathUtils.elapsedNanos(startTime);
-                    if (rc != BKException.Code.OK) {
-                        readLacOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
-                    } else {
-                        readLacOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
-                    }
-                    originalCallback.readLacComplete(rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
-                }
-            };
+        @Override
+        public void errorOut() {
+            errorOut(BKException.Code.BookieHandleNotAvailableException);
+        }
+
+        @Override
+        public void errorOut(final int rc) {
+            errorOutAndRunCallback(
+                    () -> cb.readLacComplete(rc, ledgerId, null, null, ctx));
+        }
+
+        @Override
+        public void handleV3Response(BookkeeperProtocol.Response response) {
+            ReadLacResponse readLacResponse = response.getReadLacResponse();
+            ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
+            ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
+            StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus();
+
+            if (readLacResponse.hasLacBody()) {
+                lacBuffer = Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+            }
+
+            if (readLacResponse.hasLastEntryBody()) {
+                lastEntryBuffer = Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+            }
+
+            int rc = logAndConvertStatus(status,
+                                         BKException.Code.ReadException,
+                                         "ledger", ledgerId);
+            cb.readLacComplete(rc, ledgerId, lacBuffer.slice(),
+                               lastEntryBuffer.slice(), ctx);
         }
     }
 
     // visible for testing
-    static class ReadCompletion extends CompletionValue {
+    class ReadCompletion extends CompletionValue {
         final ReadEntryCallback cb;
 
-        public ReadCompletion(final PerChannelBookieClient pcbc, ReadEntryCallback cb, Object ctx,
-                              long ledgerId, long entryId) {
-            this(pcbc, null, cb, ctx, ledgerId, entryId, null);
-        }
-
-        public ReadCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger readEntryOpLogger,
+        public ReadCompletion(CompletionKey key,
                               final ReadEntryCallback originalCallback,
-                              final Object originalCtx, final long ledgerId, final long entryId,
-                              final Timeout timeout) {
-            super(originalCtx, ledgerId, entryId, timeout);
-            final long startTime = MathUtils.nowInNano();
+                              final Object originalCtx,
+                              long ledgerId, final long entryId) {
+            super("Read", originalCtx, ledgerId, entryId,
+                  readEntryOpLogger, readTimeoutOpLogger,
+                  scheduleTimeout(key, readEntryTimeout));
+
             this.cb = new ReadEntryCallback() {
-                @Override
-                public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) {
-                    cancelTimeout();
-                    if (readEntryOpLogger != null) {
-                        long latency = MathUtils.elapsedNanos(startTime);
-                        if (rc != BKException.Code.OK) {
-                            readEntryOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
-                        } else {
-                            readEntryOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
-                        }
+                    @Override
+                    public void readEntryComplete(int rc, long ledgerId,
+                                                  long entryId, ByteBuf buffer,
+                                                  Object ctx) {
+                        cancelTimeoutAndLogOp(rc);
+                        originalCallback.readEntryComplete(rc,
+                                                           ledgerId, entryId,
+                                                           buffer, originalCtx);
                     }
+                };
+        }
 
-                    if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) {
-                        pcbc.recordError();
-                    }
+        @Override
+        public void errorOut() {
+            errorOut(BKException.Code.BookieHandleNotAvailableException);
+        }
 
-                    originalCallback.readEntryComplete(rc, ledgerId, entryId, buffer, originalCtx);
-                }
-            };
+        @Override
+        public void errorOut(final int rc) {
+            errorOutAndRunCallback(
+                    () -> cb.readEntryComplete(rc, ledgerId,
+                                               entryId, null, ctx));
         }
-    }
 
-    static class StartTLSCompletion extends CompletionValue {
-        final StartTLSCallback cb;
+        @Override
+        public void handleV2Response(long ledgerId, long entryId,
+                                     StatusCode status,
+                                     BookieProtocol.Response response) {
+            if (!(response instanceof BookieProtocol.ReadResponse)) {
+                return;
+            }
+            BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
+            ByteBuf data = null;
+            if (readResponse.hasData()) {
+                data = readResponse.getData();
+            }
+            handleReadResponse(ledgerId, entryId, status, data,
+                               INVALID_ENTRY_ID, -1L);
+        }
 
-        public StartTLSCompletion(final PerChannelBookieClient pcbc, StartTLSCallback cb, Object ctx) {
-            this(pcbc, null, cb, ctx, null);
+        @Override
+        public void handleV3Response(BookkeeperProtocol.Response response) {
+            ReadResponse readResponse = response.getReadResponse();
+            StatusCode status = response.getStatus() == StatusCode.EOK
+                ? readResponse.getStatus() : response.getStatus();
+            ByteBuf buffer = Unpooled.EMPTY_BUFFER;
+            if (readResponse.hasBody()) {
+                buffer = Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
+            }
+            long maxLAC = INVALID_ENTRY_ID;
+            if (readResponse.hasMaxLAC()) {
+                maxLAC = readResponse.getMaxLAC();
+            }
+            long lacUpdateTimestamp = -1L;
+            if (readResponse.hasLacUpdateTimestamp()) {
+                lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
+            }
+            handleReadResponse(readResponse.getLedgerId(),
+                               readResponse.getEntryId(),
+                               status, buffer, maxLAC, lacUpdateTimestamp);
+        }
+
+        private void handleReadResponse(long ledgerId,
+                                        long entryId,
+                                        StatusCode status,
+                                        ByteBuf buffer,
+                                        long maxLAC, // max known lac piggy-back from bookies
+                                        long lacUpdateTimestamp) { // the timestamp when the lac is updated.
+            int readableBytes = buffer == null ? 0 : buffer.readableBytes();
+            int rc = logAndConvertStatus(status,
+                                         BKException.Code.ReadException,
+                                         "ledger", ledgerId,
+                                         "entry", entryId,
+                                         "entryLength", readableBytes);
+
+            if(buffer != null) {
+                buffer = buffer.slice();
+            }
+            if (maxLAC > INVALID_ENTRY_ID
+                && (ctx instanceof ReadEntryCallbackCtx)) {
+                ((ReadEntryCallbackCtx)ctx).setLastAddConfirmed(maxLAC);
+            }
+            if (lacUpdateTimestamp > -1L
+                && (ctx instanceof ReadLastConfirmedAndEntryContext)) {
+                ((ReadLastConfirmedAndEntryContext)ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
+            }
+            cb.readEntryComplete(rc, ledgerId, entryId, buffer, ctx);
         }
+    }
+
+    class StartTLSCompletion extends CompletionValue {
+        final StartTLSCallback cb;
 
-        public StartTLSCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger startTLSOpLogger,
-                                  final StartTLSCallback originalCallback, final Object originalCtx, final Timeout timeout) {
-            super(originalCtx, -1, -1, timeout);
-            final long startTime = MathUtils.nowInNano();
+        public StartTLSCompletion(CompletionKey key) {
+            super("StartTLS", null, -1, -1,
+                  startTLSOpLogger, startTLSTimeoutOpLogger,
+                  scheduleTimeout(key, startTLSTimeout));
             this.cb = new StartTLSCallback() {
                 @Override
                 public void startTLSComplete(int rc, Object ctx) {
-                    cancelTimeout();
-                    if (startTLSOpLogger != null) {
-                        long latency = MathUtils.elapsedNanos(startTime);
-                        if (rc != BKException.Code.OK) {
-                            startTLSOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
-                        } else {
-                            startTLSOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
-                        }
-                    }
-
-                    if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) {
-                        pcbc.recordError();
-                    }
-
-                    if (originalCallback != null) {
-                        originalCallback.startTLSComplete(rc, originalCtx);
-                    }
+                    cancelTimeoutAndLogOp(rc);
                 }
             };
         }
+
+        @Override
+        public void errorOut() {
+            errorOut(BKException.Code.BookieHandleNotAvailableException);
+        }
+
+        @Override
+        public void errorOut(final int rc) {
+            failTLS(rc);
+        }
+
+        @Override
+        public void handleV3Response(BookkeeperProtocol.Response response) {
+            StatusCode status = response.getStatus();
+
+            int rc = logAndConvertStatus(status,
+                                         BKException.Code.SecurityException);
+
+            // Cancel START_TLS request timeout
+            cb.startTLSComplete(rc, null);
+
+            if (state != ConnectionState.START_TLS) {
+                LOG.error("Connection state changed before TLS response received");
+                failTLS(BKException.Code.BookieHandleNotAvailableException);
+            } else if (status != StatusCode.EOK) {
+                LOG.error("Client received error {} during TLS negotiation", status);
+                failTLS(BKException.Code.SecurityException);
+            } else {
+                initTLSHandshake();
+            }
+        }
+
     }
 
     // visible for testing
-    static class GetBookieInfoCompletion extends CompletionValue {
+    class GetBookieInfoCompletion extends CompletionValue {
         final GetBookieInfoCallback cb;
 
-        public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, GetBookieInfoCallback cb, Object ctx) {
-            this(pcbc, null, cb, ctx, null);
+        public GetBookieInfoCompletion(CompletionKey key,
+                                       final GetBookieInfoCallback origCallback,
+                                       final Object origCtx) {
+            super("GetBookieInfo", origCtx, 0L, 0L,
+                  getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger,
+                  scheduleTimeout(key, getBookieInfoTimeout));
+            this.cb = new GetBookieInfoCallback() {
+                @Override
+                public void getBookieInfoComplete(int rc, BookieInfo bInfo,
+                                                  Object ctx) {
+                    cancelTimeoutAndLogOp(rc);
+                    origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
+                }
+            };
         }
 
-        public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger getBookieInfoOpLogger,
-                              final GetBookieInfoCallback originalCallback,
-                              final Object originalCtx, final Timeout timeout) {
-            super(originalCtx, 0L, 0L, timeout);
-            final long startTime = MathUtils.nowInNano();
-            this.cb = (null == getBookieInfoOpLogger) ? originalCallback : new GetBookieInfoCallback() {
-                @Override
-                public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
-                    cancelTimeout();
-                    if (getBookieInfoOpLogger != null) {
-                        long latency = MathUtils.elapsedNanos(startTime);
-                        if (rc != BKException.Code.OK) {
-                            getBookieInfoOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
-                        } else {
-                            getBookieInfoOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
-                        }
-                    }
+        @Override
+        public void errorOut() {
+            errorOut(BKException.Code.BookieHandleNotAvailableException);
+        }
 
-                    if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) {
-                        pcbc.recordError();
-                    }
+        @Override
+        public void errorOut(final int rc) {
+            errorOutAndRunCallback(
+                    () -> cb.getBookieInfoComplete(rc, new BookieInfo(), ctx));
+        }
 
-                    originalCallback.getBookieInfoComplete(rc, bInfo, originalCtx);
-                }
-            };
+        @Override
+        public void handleV3Response(BookkeeperProtocol.Response response) {
+            GetBookieInfoResponse getBookieInfoResponse
+                = response.getGetBookieInfoResponse();
+            StatusCode status = response.getStatus() == StatusCode.EOK
+                ? getBookieInfoResponse.getStatus() : response.getStatus();
+
+            long freeDiskSpace = getBookieInfoResponse.getFreeDiskSpace();
+            long totalDiskSpace = getBookieInfoResponse.getTotalDiskCapacity();
+
+            int rc = logAndConvertStatus(status,
+                                         BKException.Code.ReadException,
+                                         "freeDisk", freeDiskSpace,
+                                         "totalDisk", totalDiskSpace);
+            cb.getBookieInfoComplete(rc,
+                                     new BookieInfo(totalDiskSpace,
+                                                    freeDiskSpace), ctx);
         }
     }
 
     // visible for testing
-    static class AddCompletion extends CompletionValue {
+    class AddCompletion extends CompletionValue {
         final WriteCallback cb;
 
-        public AddCompletion(final PerChannelBookieClient pcbc, WriteCallback cb, Object ctx,
-                             long ledgerId, long entryId) {
-            this(pcbc, null, cb, ctx, ledgerId, entryId, null);
-        }
-
-        public AddCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger addEntryOpLogger,
+        public AddCompletion(CompletionKey key,
                              final WriteCallback originalCallback,
-                             final Object originalCtx, final long ledgerId, final long entryId,
-                             final Timeout timeout) {
-            super(originalCtx, ledgerId, entryId, timeout);
-            final long startTime = MathUtils.nowInNano();
-            this.cb = null == addEntryOpLogger ? originalCallback : new WriteCallback() {
+                             final Object originalCtx,
+                             final long ledgerId, final long entryId) {
+            super("Add", originalCtx, ledgerId, entryId,
+                  addEntryOpLogger, addTimeoutOpLogger,
+                  scheduleTimeout(key, addEntryTimeout));
+            this.cb = new WriteCallback() {
                 @Override
-                public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
-                    cancelTimeout();
-                    if (pcbc.addEntryOpLogger != null) {
-                        long latency = MathUtils.elapsedNanos(startTime);
-                        if (rc != BKException.Code.OK) {
-                            pcbc.addEntryOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
-                        } else {
-                            pcbc.addEntryOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
-                        }
-                    }
-
-                    if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) {
-                        pcbc.recordError();
-                    }
-
-                    originalCallback.writeComplete(rc, ledgerId, entryId, addr, originalCtx);
+                public void writeComplete(int rc, long ledgerId, long entryId,
+                                          BookieSocketAddress addr,
+                                          Object ctx) {
+                    cancelTimeoutAndLogOp(rc);
+                    originalCallback.writeComplete(rc, ledgerId, entryId,
+                                                   addr, originalCtx);
                 }
             };
         }
+
+        @Override
+        public void errorOut() {
+            errorOut(BKException.Code.BookieHandleNotAvailableException);
+        }
+
+        @Override
+        public void errorOut(final int rc) {
+            errorOutAndRunCallback(
+                    () -> cb.writeComplete(rc, ledgerId, entryId, addr, ctx));
+        }
+
+        @Override
+        public void handleV2Response(
+                long ledgerId, long entryId, StatusCode status,
+                BookieProtocol.Response response) {
+            handleResponse(ledgerId, entryId, status);
+        }
+
+        @Override
+        public void handleV3Response(
+                BookkeeperProtocol.Response response) {
+            AddResponse addResponse = response.getAddResponse();
+            StatusCode status = response.getStatus() == StatusCode.EOK
+                ? addResponse.getStatus() : response.getStatus();
+            handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
+                           status);
+        }
+
+        private void handleResponse(long ledgerId, long entryId,
+                                    StatusCode status) {
+            int rc = logAndConvertStatus(status,
+                                         BKException.Code.WriteException,
+                                         "ledger", ledgerId,
+                                         "entry", entryId);
+            cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+        }
     }
 
     // visable for testing
@@ -1936,16 +1687,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     abstract class CompletionKey implements TimerTask {
         final long txnId;
         final OperationType operationType;
-        final long requestAt;
 
-        CompletionKey(long txnId, OperationType operationType) {
+        CompletionKey(long txnId,
+                      OperationType operationType) {
             this.txnId = txnId;
             this.operationType = operationType;
-            this.requestAt = MathUtils.nowInNano();
-        }
-
-        private long elapsedTime() {
-            return MathUtils.elapsedNanos(requestAt);
         }
 
         @Override
@@ -1953,26 +1699,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             if (timeout.isCancelled()) {
                 return;
             }
-            if (OperationType.ADD_ENTRY == operationType) {
-                errorOutAddKey(this, BKException.Code.TimeoutException);
-                addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
-            } else if (OperationType.READ_ENTRY == operationType) {
-                errorOutReadKey(this, BKException.Code.TimeoutException);
-                readTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
-            } else if (OperationType.WRITE_LAC == operationType) {
-                errorOutWriteLacKey(this, BKException.Code.TimeoutException);
-                writeLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
-            } else if (OperationType.READ_LAC == operationType) {
-                errorOutReadLacKey(this, BKException.Code.TimeoutException);
-                readLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
-            } else if (OperationType.GET_BOOKIE_INFO == operationType) {
-                errorOutGetBookieInfoKey(this, BKException.Code.TimeoutException);
-                getBookieInfoTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
-            } else if (OperationType.START_TLS == operationType) {
-                errorStartTLS(BKException.Code.TimeoutException);
-            } else {
-                errorOutGetBookieInfoKey(this, BKException.Code.TimeoutException);
-                getBookieInfoTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS);
+            CompletionValue completion = completionObjects.remove(this);
+            if (completion != null) {
+                completion.timeout();
             }
         }
     }
@@ -2051,7 +1780,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             return String.format("%d:%d %s", ledgerId, entryId, operationType);
         }
     }
-    
+
     public class ConnectionFutureListener implements ChannelFutureListener {
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
@@ -2121,8 +1850,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         assert state == ConnectionState.CONNECTING;
         final long txnId = getTxnId();
         final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.START_TLS);
-        completionObjects.put(completionKey, new StartTLSCompletion(this, startTLSOpLogger, null, null,
-                scheduleTimeout(completionKey, startTLSTimeout)));
+        completionObjects.put(completionKey,
+                              new StartTLSCompletion(completionKey));
         BookkeeperProtocol.Request.Builder h = BookkeeperProtocol.Request.newBuilder();
         BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
                 .setVersion(ProtocolVersion.VERSION_THREE)
@@ -2131,16 +1860,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         h.setHeader(headerBuilder.build());
         h.setStartTLSRequest(BookkeeperProtocol.StartTLSRequest.newBuilder().build());
         state = ConnectionState.START_TLS;
-        channel.writeAndFlush(h.build()).addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (!future.isSuccess()) {
-                    LOG.error("Failed to send START_TLS request");
-                    failTLS(BKException.Code.SecurityException);
-                }
-            }
-        });
+        writeAndFlush(channel, completionKey, h.build());
     }
+
     private void failTLS(int rc) {
         LOG.error("TLS failure on: {}, rc: {}", channel, rc);
         Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].