You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/08/14 09:56:23 UTC

[bookkeeper] branch master updated: Make bookie client an interface

This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 36d8590  Make bookie client an interface
36d8590 is described below

commit 36d8590e42f8f34f79568dad7494c38c28adecb4
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue Aug 14 11:56:15 2018 +0200

    Make bookie client an interface
    
    So that it can be mocked easily for testing.
    This patch also contains a simple mock.
    
    Author: Ivan Kelly <iv...@apache.org>
    
    Reviewers: Sijie Guo <si...@apache.org>
    
    This closes #1595 from ivankelly/bookie-client-iface
---
 .../apache/bookkeeper/benchmark/BenchBookie.java   |   3 +-
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   3 +-
 .../org/apache/bookkeeper/client/BookKeeper.java   |   5 +-
 .../org/apache/bookkeeper/client/LedgerHandle.java |  12 +-
 .../org/apache/bookkeeper/proto/BookieClient.java  | 713 +++++----------------
 .../{BookieClient.java => BookieClientImpl.java}   |  37 +-
 .../apache/bookkeeper/client/SlowBookieTest.java   |   3 +-
 .../client/TestGetBookieInfoTimeout.java           |   3 +-
 .../apache/bookkeeper/proto/MockBookieClient.java  | 288 +++++++++
 .../apache/bookkeeper/test/BookieClientTest.java   |  13 +-
 10 files changed, 503 insertions(+), 577 deletions(-)

diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index e1c5979..216aff9 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -174,7 +175,7 @@ public class BenchBookie {
                 new DefaultThreadFactory("BookKeeperClientScheduler"));
 
         ClientConfiguration conf = new ClientConfiguration();
-        BookieClient bc = new BookieClient(conf, eventLoop, executor, scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(conf, eventLoop, executor, scheduler, NullStatsLogger.INSTANCE);
         LatencyCallback lc = new LatencyCallback();
 
         ThroughputCallback tc = new ThroughputCallback();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 5b4eff8..4434cfc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -100,6 +100,7 @@ import org.apache.bookkeeper.meta.UnderreplicatedLedger;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -829,7 +830,7 @@ public class BookieShell implements Tool {
                     ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
                         new DefaultThreadFactory("BookKeeperClientSchedulerPool"));
 
-                    BookieClient bookieClient = new BookieClient(conf, eventLoopGroup, executor,
+                    BookieClient bookieClient = new BookieClientImpl(conf, eventLoopGroup, executor,
                         scheduler, NullStatsLogger.INSTANCE);
 
                     LongStream.range(firstEntry, lastEntry).forEach(entryId -> {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 09d2526..1d909b6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -74,6 +74,7 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.DataFormats;
 import org.apache.bookkeeper.stats.Counter;
@@ -507,8 +508,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
             this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
         }
         // initialize bookie client
-        this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool,
-                                             scheduler, statsLogger);
+        this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.mainWorkerPool,
+                                                 scheduler, statsLogger);
         this.bookieWatcher = new BookieWatcherImpl(
                 conf, this.placementPolicy, metadataDriver.getRegistrationClient(),
                 this.statsLogger.scope(WATCHER_SCOPE));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index a646651..535584b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -80,7 +80,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
-import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.proto.checksum.MacDigestManager;
 import org.apache.bookkeeper.stats.Counter;
@@ -99,8 +98,6 @@ import org.slf4j.LoggerFactory;
 public class LedgerHandle implements WriteHandle {
     static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
 
-    static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
-
     final byte[] ledgerKey;
     private LedgerMetadata metadata;
     final BookKeeper bk;
@@ -225,14 +222,7 @@ public class LedgerHandle implements WriteHandle {
 
             @Override
             public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
-                PerChannelBookieClientPool pcbcPool = bk.bookieClient.lookupClient(bookieSocketAddress);
-                if (pcbcPool == null) {
-                    return 0;
-                } else if (pcbcPool.isWritable(ledgerId)) {
-                    return pcbcPool.getNumPendingCompletionRequests();
-                } else {
-                    return pcbcPool.getNumPendingCompletionRequests() | PENDINGREQ_NOTWRITABLE_MASK;
-                }
+                return bk.bookieClient.getNumPendingRequests(bookieSocketAddress, ledgerId);
             }
         };
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 197483d..85a4ef9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -20,579 +20,208 @@
  */
 package org.apache.bookkeeper.proto;
 
-import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.ExtensionRegistry;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
-import org.apache.bookkeeper.auth.ClientAuthProvider;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.api.WriteFlag;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.SafeRunnable;
-import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.tls.SecurityException;
-import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Implements the client-side part of the BookKeeper protocol.
- *
+ * Low level client for talking to bookies.
  */
-public class BookieClient implements PerChannelBookieClientFactory {
-    static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
-
-    // This is global state that should be across all BookieClients
-    AtomicLong totalBytesOutstanding = new AtomicLong();
-
-    OrderedExecutor executor;
-    ScheduledExecutorService scheduler;
-    ScheduledFuture<?> timeoutFuture;
-
-    EventLoopGroup eventLoopGroup;
-    final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
-            new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
-
-    private final ClientAuthProvider.Factory authProviderFactory;
-    private final ExtensionRegistry registry;
-
-    private final ClientConfiguration conf;
-    private volatile boolean closed;
-    private final ReentrantReadWriteLock closeLock;
-    private final StatsLogger statsLogger;
-    private final int numConnectionsPerBookie;
-
-    private final long bookieErrorThresholdPerInterval;
-
-    public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
-                        OrderedExecutor executor, ScheduledExecutorService scheduler,
-                        StatsLogger statsLogger) throws IOException {
-        this.conf = conf;
-        this.eventLoopGroup = eventLoopGroup;
-        this.executor = executor;
-        this.closed = false;
-        this.closeLock = new ReentrantReadWriteLock();
-
-        this.registry = ExtensionRegistry.newInstance();
-        this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf);
-
-        this.statsLogger = statsLogger;
-        this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
-        this.bookieErrorThresholdPerInterval = conf.getBookieErrorThresholdPerInterval();
-
-        this.scheduler = scheduler;
-        if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
-            SafeRunnable monitor = safeRun(() -> {
-                monitorPendingOperations();
-            });
-            this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
-                                                                    conf.getTimeoutMonitorIntervalSec(),
-                                                                    conf.getTimeoutMonitorIntervalSec(),
-                                                                    TimeUnit.SECONDS);
-        }
-    }
-
-    private int getRc(int rc) {
-        if (BKException.Code.OK == rc) {
-            return rc;
-        } else {
-            if (closed) {
-                return BKException.Code.ClientClosedException;
-            } else {
-                return rc;
-            }
-        }
-    }
-
-    public List<BookieSocketAddress> getFaultyBookies() {
-        List<BookieSocketAddress> faultyBookies = Lists.newArrayList();
-        for (PerChannelBookieClientPool channelPool : channels.values()) {
-            if (channelPool instanceof DefaultPerChannelBookieClientPool) {
-                DefaultPerChannelBookieClientPool pool = (DefaultPerChannelBookieClientPool) channelPool;
-                if (pool.errorCounter.getAndSet(0) >= bookieErrorThresholdPerInterval) {
-                    faultyBookies.add(pool.address);
-                }
-            }
-        }
-        return faultyBookies;
-    }
-
-    public boolean isWritable(BookieSocketAddress address, long key) {
-        final PerChannelBookieClientPool pcbcPool = lookupClient(address);
-        // if null, let the write initiate connect of fail with whatever error it produces
-        return pcbcPool == null || pcbcPool.isWritable(key);
-    }
-
-    @Override
-    public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
-            SecurityHandlerFactory shFactory) throws SecurityException {
-        return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLogger,
-                                          authProviderFactory, registry, pcbcPool, shFactory);
-    }
-
-    public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
-        PerChannelBookieClientPool clientPool = channels.get(addr);
-        if (null == clientPool) {
-            closeLock.readLock().lock();
-            try {
-                if (closed) {
-                    return null;
-                }
-                PerChannelBookieClientPool newClientPool =
-                    new DefaultPerChannelBookieClientPool(conf, this, addr, numConnectionsPerBookie);
-                PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
-                if (null == oldClientPool) {
-                    clientPool = newClientPool;
-                    // initialize the pool only after we put the pool into the map
-                    clientPool.intialize();
-                } else {
-                    clientPool = oldClientPool;
-                    newClientPool.close(false);
-                }
-            } catch (SecurityException e) {
-                LOG.error("Security Exception in creating new default PCBC pool: ", e);
-                return null;
-            } finally {
-                closeLock.readLock().unlock();
-            }
-        }
-        return clientPool;
-    }
-
-    public void forceLedger(final BookieSocketAddress addr, final long ledgerId,
-            final ForceLedgerCallback cb, final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.forceLedgerComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                              ledgerId, addr, ctx);
-            return;
-        }
-
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.executeOrdered(ledgerId, safeRun(() -> {
-                        cb.forceLedgerComplete(rc, ledgerId, addr, ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.forceLedgerComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
-                }
-            } else {
-                pcbc.forceLedger(ledgerId, cb, ctx);
-            }
-        }, ledgerId);
-    }
-
-    public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
-            final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                              ledgerId, addr, ctx);
-            return;
-        }
-
-        toSend.retain();
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.executeOrdered(ledgerId, safeRun(() -> {
-                        cb.writeLacComplete(rc, ledgerId, addr, ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
-                }
-            } else {
-                pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
-            }
-
-            toSend.release();
-        }, ledgerId);
-    }
-
-    private void completeAdd(final int rc,
-                             final long ledgerId,
-                             final long entryId,
-                             final BookieSocketAddress addr,
-                             final WriteCallback cb,
-                             final Object ctx) {
-        try {
-            executor.executeOrdered(ledgerId, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
-                }
-                @Override
-                public String toString() {
-                    return String.format("CompleteWrite(ledgerId=%d, entryId=%d, addr=%s)", ledgerId, entryId, addr);
-                }
-            });
-        } catch (RejectedExecutionException ree) {
-            cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx);
-        }
-    }
-
-    public void addEntry(final BookieSocketAddress addr,
-                         final long ledgerId,
-                         final byte[] masterKey,
-                         final long entryId,
-                         final ByteBufList toSend,
-                         final WriteCallback cb,
-                         final Object ctx,
-                         final int options,
-                         final boolean allowFastFail,
-                         final EnumSet<WriteFlag> writeFlags) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
-                        ledgerId, entryId, addr, cb, ctx);
-            return;
-        }
-
-        // Retain the buffer, since the connection could be obtained after
-        // the PendingApp might have already failed
-        toSend.retain();
-
-        client.obtain(ChannelReadyForAddEntryCallback.create(
-                              this, toSend, ledgerId, entryId, addr,
-                                  ctx, cb, options, masterKey, allowFastFail, writeFlags),
-                      ledgerId);
-    }
-
-    private void completeRead(final int rc,
-                              final long ledgerId,
-                              final long entryId,
-                              final ByteBuf entry,
-                              final ReadEntryCallback cb,
-                              final Object ctx) {
-        try {
-            executor.executeOrdered(ledgerId, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    cb.readEntryComplete(rc, ledgerId, entryId, entry, ctx);
-                }
-            });
-        } catch (RejectedExecutionException ree) {
-            cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
-                                 ledgerId, entryId, entry, ctx);
-        }
-    }
-
-    private static class ChannelReadyForAddEntryCallback
-        implements GenericCallback<PerChannelBookieClient> {
-        private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
-
-        private BookieClient bookieClient;
-        private ByteBufList toSend;
-        private long ledgerId;
-        private long entryId;
-        private BookieSocketAddress addr;
-        private Object ctx;
-        private WriteCallback cb;
-        private int options;
-        private byte[] masterKey;
-        private boolean allowFastFail;
-        private EnumSet<WriteFlag> writeFlags;
-
-        static ChannelReadyForAddEntryCallback create(
-                BookieClient bookieClient, ByteBufList toSend, long ledgerId,
-                long entryId, BookieSocketAddress addr, Object ctx,
-                WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
-                EnumSet<WriteFlag> writeFlags) {
-            ChannelReadyForAddEntryCallback callback = RECYCLER.get();
-            callback.bookieClient = bookieClient;
-            callback.toSend = toSend;
-            callback.ledgerId = ledgerId;
-            callback.entryId = entryId;
-            callback.addr = addr;
-            callback.ctx = ctx;
-            callback.cb = cb;
-            callback.options = options;
-            callback.masterKey = masterKey;
-            callback.allowFastFail = allowFastFail;
-            callback.writeFlags = writeFlags;
-            return callback;
-        }
-
-        @Override
-        public void operationComplete(final int rc,
-                                      PerChannelBookieClient pcbc) {
-            if (rc != BKException.Code.OK) {
-                bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
-            } else {
-                pcbc.addEntry(ledgerId, masterKey, entryId,
-                              toSend, cb, ctx, options, allowFastFail, writeFlags);
-            }
-
-            toSend.release();
-            recycle();
-        }
-
-        private ChannelReadyForAddEntryCallback(
-                Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
-        }
+public interface BookieClient {
+    long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
 
-        private static final Recycler<ChannelReadyForAddEntryCallback> RECYCLER =
-            new Recycler<ChannelReadyForAddEntryCallback>() {
-                    protected ChannelReadyForAddEntryCallback newObject(
-                            Recycler.Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
-                        return new ChannelReadyForAddEntryCallback(recyclerHandle);
-                    }
-                };
-
-        public void recycle() {
-            bookieClient = null;
-            toSend = null;
-            ledgerId = -1;
-            entryId = -1;
-            addr = null;
-            ctx = null;
-            cb = null;
-            options = -1;
-            masterKey = null;
-            allowFastFail = false;
-            writeFlags = null;
-            recyclerHandle.recycle(this);
-        }
-    }
-
-    public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb,
-            final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null,
-                    ctx);
-            return;
-        }
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.executeOrdered(ledgerId, safeRun(() -> {
-                        cb.readLacComplete(rc, ledgerId, null, null, ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.readLacComplete(getRc(BKException.Code.InterruptedException),
-                            ledgerId, null, null, ctx);
-                }
-            } else {
-                pcbc.readLac(ledgerId, cb, ctx);
-            }
-        }, ledgerId);
-    }
-
-    public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
-                          ReadEntryCallback cb, Object ctx, int flags) {
-        readEntry(addr, ledgerId, entryId, cb, ctx, flags, null);
-    }
-
-    public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
-                          final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) {
-        readEntry(addr, ledgerId, entryId, cb, ctx, flags, masterKey, false);
-    }
-
-    public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
-                          final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey,
-                          final boolean allowFastFail) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                                 ledgerId, entryId, null, ctx);
-            return;
-        }
+    /**
+     * Get the list of bookies which have exhibited more error responses
+     * than a configured threshold.
+     *
+     * @return the list of faulty bookies
+     */
+    List<BookieSocketAddress> getFaultyBookies();
 
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                completeRead(rc, ledgerId, entryId, null, cb, ctx);
-            } else {
-                pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey, allowFastFail);
-            }
-        }, ledgerId);
-    }
+    /**
+     * Check whether the channel used to write to a bookie channel is writable.
+     * A channel becomes non-writable when its buffer become full, and will stay
+     * non-writable until some of the buffer is cleared.
+     *
+     * <p>This can be used to apply backpressure. If a channel is not writable,
+     * requests will end up queuing.
+     *
+     * <p>As as we use pooling, there may be multiple channels per bookie, so
+     * we also pass the ledger ID to check the writability of the correct
+     * channel.
+     *
+     * <p>This has nothing to do with the bookie read-only status.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger we wish to send a request to
+     *
+     */
+    boolean isWritable(BookieSocketAddress address, long ledgerId);
 
+    /**
+     * Get the number of outstanding requests on the channel used to connect
+     * to a bookie at {@code address} for a ledger with {@code ledgerId}.
+     * It is necessary to specify the ledgerId as there may be multiple
+     * channels for a single bookie if pooling is in use.
+     * If the bookie is not {@link #isWritable(BookieSocketAddress,long) writable},
+     * then the {@link #PENDINGREQ_NOTWRITABLE_MASK} will be logically or'd with
+     * the returned value.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger whose channel we wish to query
+     * @return the number of requests currently outstanding
+     */
+    long getNumPendingRequests(BookieSocketAddress address, long ledgerId);
 
-    public void readEntryWaitForLACUpdate(final BookieSocketAddress addr,
-                                          final long ledgerId,
-                                          final long entryId,
-                                          final long previousLAC,
-                                          final long timeOutInMillis,
-                                          final boolean piggyBackEntry,
-                                          final ReadEntryCallback cb,
-                                          final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            completeRead(BKException.Code.BookieHandleNotAvailableException,
-                    ledgerId, entryId, null, cb, ctx);
-            return;
-        }
+    /**
+     * Send a force request to the server. When complete all entries which have
+     * been written for {@code ledgerId} to this bookie will be persisted on disk.
+     * This is for use with {@link org.apache.bookkeeper.client.api.WriteFlag#DEFERRED_SYNC}.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger whose entries we want persisted
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     */
+    void forceLedger(BookieSocketAddress address, long ledgerId,
+                     ForceLedgerCallback cb, Object ctx);
 
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                completeRead(rc, ledgerId, entryId, null, cb, ctx);
-            } else {
-                pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb,
-                        ctx);
-            }
-        }, ledgerId);
-    }
+    /**
+     * Read the last add confirmed for ledger {@code ledgerId} from the bookie at
+     * {@code address}.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger whose last add confirm we wish to know
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     */
+    void readLac(BookieSocketAddress address, long ledgerId, ReadLacCallback cb, Object ctx);
 
-    public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb,
-            final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(),
-                    ctx);
-            return;
-        }
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.submit(safeRun(() -> {
-                        cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
-                            new BookieInfo(), ctx);
-                }
-            } else {
-                pcbc.getBookieInfo(requested, cb, ctx);
-            }
-        }, requested);
-    }
+    /**
+     * Explicitly write the last add confirmed for ledger {@code ledgerId} to the bookie at
+     * {@code address}.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger whose last add confirm we wish to know
+     * @param masterKey the master key of the ledger
+     * @param lac the last add confirmed we wish to write
+     * @param toSend a buffer also containing the lac, along with a digest
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     */
+    void writeLac(BookieSocketAddress address, long ledgerId, byte[] masterKey,
+                  long lac, ByteBufList toSend, WriteLacCallback cb, Object ctx);
 
-    private void monitorPendingOperations() {
-        for (PerChannelBookieClientPool clientPool : channels.values()) {
-            clientPool.checkTimeoutOnPendingOperations();
-        }
-    }
+    /**
+     * Add an entry for ledger {@code ledgerId} on the bookie at address {@code address}.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger to which we wish to add the entry
+     * @param entryId the id of the entry we wish to add
+     * @param toSend a buffer containing the entry and its digest
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     * @param options a bit mask of flags from BookieProtocol.FLAG_*
+     *                {@link org.apache.bookkeeper.proto.BookieProtocol}
+     * @param allowFastFail fail the add immediately if the channel is non-writable
+     *                      {@link #isWritable(BookieSocketAddress,long)}
+     * @param writeFlags a set of write flags
+     *                   {@link org.apache.bookkeeper.client.api.WriteFlags}
+     */
+    void addEntry(BookieSocketAddress address, long ledgerId, byte[] masterKey,
+                  long entryId, ByteBufList toSend, WriteCallback cb, Object ctx,
+                  int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags);
 
-    public boolean isClosed() {
-        return closed;
+    /**
+     * Read entry with a null masterkey, disallowing failfast.
+     * @see #readEntry(BookieSocketAddress,long,long,ReadEntryCallback,Object,int,byte[],boolean)
+     */
+    default void readEntry(BookieSocketAddress address, long ledgerId, long entryId,
+                           ReadEntryCallback cb, Object ctx, int flags) {
+        readEntry(address, ledgerId, entryId, cb, ctx, flags, null);
     }
 
-    public void close() {
-        closeLock.writeLock().lock();
-        try {
-            closed = true;
-            for (PerChannelBookieClientPool pool : channels.values()) {
-                pool.close(true);
-            }
-            channels.clear();
-            authProviderFactory.close();
-
-            if (timeoutFuture != null) {
-                timeoutFuture.cancel(false);
-            }
-        } finally {
-            closeLock.writeLock().unlock();
-        }
+    /**
+     * Read entry, disallowing failfast.
+     * @see #readEntry(BookieSocketAddress,long,long,ReadEntryCallback,Object,int,byte[],boolean)
+     */
+    default void readEntry(BookieSocketAddress address, long ledgerId, long entryId,
+                           ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey) {
+        readEntry(address, ledgerId, entryId, cb, ctx, flags, masterKey, false);
     }
 
-    private static class Counter {
-        int i;
-        int total;
-
-        synchronized void inc() {
-            i++;
-            total++;
-        }
-
-        synchronized void dec() {
-            i--;
-            notifyAll();
-        }
-
-        synchronized void wait(int limit) throws InterruptedException {
-            while (i > limit) {
-                wait();
-            }
-        }
+    /**
+     * Read an entry from bookie at address {@code address}.
+     *
+     * @param address address of the bookie to read from
+     * @param ledgerId id of the ledger the entry belongs to
+     * @param entryId id of the entry we wish to read
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     * @param flags a bit mask of flags from BookieProtocol.FLAG_*
+     *              {@link org.apache.bookkeeper.proto.BookieProtocol}
+     * @param masterKey the master key of the ledger being read from. This is only required
+     *                  if the FLAG_DO_FENCING is specified.
+     * @param allowFastFail fail the read immediately if the channel is non-writable
+     *                      {@link #isWritable(BookieSocketAddress,long)}
+     */
+    void readEntry(BookieSocketAddress address, long ledgerId, long entryId,
+                   ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey,
+                   boolean allowFastFail);
 
-        synchronized int total() {
-            return total;
-        }
-    }
+    /**
+     * Send a long poll request to bookie, waiting for the last add confirmed
+     * to be updated. The client can also request that the full entry is returned
+     * with the new last add confirmed.
+     *
+     * @param address address of bookie to send the long poll address to
+     * @param ledgerId ledger whose last add confirmed we are interested in
+     * @param entryId the id of the entry we expect to read
+     * @param previousLAC the previous lac value
+     * @param timeOutInMillis number of millis to wait for LAC update
+     * @param piggyBackEntry whether to read the requested entry when LAC is updated
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     */
+    void readEntryWaitForLACUpdate(BookieSocketAddress address,
+                                   long ledgerId,
+                                   long entryId,
+                                   long previousLAC,
+                                   long timeOutInMillis,
+                                   boolean piggyBackEntry,
+                                   ReadEntryCallback cb,
+                                   Object ctx);
 
     /**
-     * @param args
-     * @throws IOException
-     * @throws NumberFormatException
-     * @throws InterruptedException
+     * Read information about the bookie, from the bookie.
+     *
+     * @param address the address of the bookie to request information from
+     * @param requested a bitset specifying which pieces of information to request
+     *                  {@link org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest}
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     *
+     * @see org.apache.bookkeeper.client.BookieInfoReader.BookieInfo
      */
-    public static void main(String[] args) throws NumberFormatException, IOException, InterruptedException {
-        if (args.length != 3) {
-            System.err.println("USAGE: BookieClient bookieHost port ledger#");
-            return;
-        }
-        WriteCallback cb = new WriteCallback() {
+    void getBookieInfo(BookieSocketAddress address, long requested,
+                       GetBookieInfoCallback cb, Object ctx);
 
-            public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress addr, Object ctx) {
-                Counter counter = (Counter) ctx;
-                counter.dec();
-                if (rc != 0) {
-                    System.out.println("rc = " + rc + " for " + entry + "@" + ledger);
-                }
-            }
-        };
-        Counter counter = new Counter();
-        byte hello[] = "hello".getBytes(UTF_8);
-        long ledger = Long.parseLong(args[2]);
-        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
-        OrderedExecutor executor = OrderedExecutor.newBuilder()
-                .name("BookieClientWorker")
-                .numThreads(1)
-                .build();
-        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
-                new DefaultThreadFactory("BookKeeperClientScheduler"));
-        BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
-                                           scheduler, NullStatsLogger.INSTANCE);
-        BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
+    /**
+     * @return whether bookie client object has been closed
+     */
+    boolean isClosed();
 
-        for (int i = 0; i < 100000; i++) {
-            counter.inc();
-            bc.addEntry(addr, ledger, new byte[0], i,
-                    ByteBufList.get(Unpooled.wrappedBuffer(hello)), cb, counter, 0, false,
-                    WriteFlag.NONE);
-        }
-        counter.wait(0);
-        System.out.println("Total = " + counter.total());
-        scheduler.shutdown();
-        eventLoopGroup.shutdownGracefully();
-        executor.shutdown();
-    }
+    /**
+     * Close the bookie client object.
+     */
+    void close();
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
similarity index 95%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
copy to bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index 197483d..50dd85f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -43,7 +43,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
@@ -74,12 +73,9 @@ import org.slf4j.LoggerFactory;
  * Implements the client-side part of the BookKeeper protocol.
  *
  */
-public class BookieClient implements PerChannelBookieClientFactory {
+public class BookieClientImpl implements BookieClient, PerChannelBookieClientFactory {
     static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
 
-    // This is global state that should be across all BookieClients
-    AtomicLong totalBytesOutstanding = new AtomicLong();
-
     OrderedExecutor executor;
     ScheduledExecutorService scheduler;
     ScheduledFuture<?> timeoutFuture;
@@ -99,9 +95,9 @@ public class BookieClient implements PerChannelBookieClientFactory {
 
     private final long bookieErrorThresholdPerInterval;
 
-    public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
-                        OrderedExecutor executor, ScheduledExecutorService scheduler,
-                        StatsLogger statsLogger) throws IOException {
+    public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
+                            OrderedExecutor executor, ScheduledExecutorService scheduler,
+                            StatsLogger statsLogger) throws IOException {
         this.conf = conf;
         this.eventLoopGroup = eventLoopGroup;
         this.executor = executor;
@@ -139,6 +135,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         }
     }
 
+    @Override
     public List<BookieSocketAddress> getFaultyBookies() {
         List<BookieSocketAddress> faultyBookies = Lists.newArrayList();
         for (PerChannelBookieClientPool channelPool : channels.values()) {
@@ -152,6 +149,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         return faultyBookies;
     }
 
+    @Override
     public boolean isWritable(BookieSocketAddress address, long key) {
         final PerChannelBookieClientPool pcbcPool = lookupClient(address);
         // if null, let the write initiate connect of fail with whatever error it produces
@@ -159,6 +157,18 @@ public class BookieClient implements PerChannelBookieClientFactory {
     }
 
     @Override
+    public long getNumPendingRequests(BookieSocketAddress address, long ledgerId) {
+        PerChannelBookieClientPool pcbcPool = lookupClient(address);
+        if (pcbcPool == null) {
+            return 0;
+        } else if (pcbcPool.isWritable(ledgerId)) {
+            return pcbcPool.getNumPendingCompletionRequests();
+        } else {
+            return pcbcPool.getNumPendingCompletionRequests() | PENDINGREQ_NOTWRITABLE_MASK;
+        }
+    }
+
+    @Override
     public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
             SecurityHandlerFactory shFactory) throws SecurityException {
         return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLogger,
@@ -194,6 +204,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         return clientPool;
     }
 
+    @Override
     public void forceLedger(final BookieSocketAddress addr, final long ledgerId,
             final ForceLedgerCallback cb, final Object ctx) {
         final PerChannelBookieClientPool client = lookupClient(addr);
@@ -218,6 +229,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         }, ledgerId);
     }
 
+    @Override
     public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
             final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
         final PerChannelBookieClientPool client = lookupClient(addr);
@@ -267,6 +279,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         }
     }
 
+    @Override
     public void addEntry(final BookieSocketAddress addr,
                          final long ledgerId,
                          final byte[] masterKey,
@@ -317,7 +330,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         implements GenericCallback<PerChannelBookieClient> {
         private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
 
-        private BookieClient bookieClient;
+        private BookieClientImpl bookieClient;
         private ByteBufList toSend;
         private long ledgerId;
         private long entryId;
@@ -330,7 +343,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         private EnumSet<WriteFlag> writeFlags;
 
         static ChannelReadyForAddEntryCallback create(
-                BookieClient bookieClient, ByteBufList toSend, long ledgerId,
+                BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId,
                 long entryId, BookieSocketAddress addr, Object ctx,
                 WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
                 EnumSet<WriteFlag> writeFlags) {
@@ -579,8 +592,8 @@ public class BookieClient implements PerChannelBookieClientFactory {
                 .build();
         ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
                 new DefaultThreadFactory("BookKeeperClientScheduler"));
-        BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
-                                           scheduler, NullStatsLogger.INSTANCE);
+        BookieClientImpl bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
+                                                   scheduler, NullStatsLogger.INSTANCE);
         BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
 
         for (int i = 0; i < 100000; i++) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 7c6f9a0..71a9b11 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
@@ -296,7 +297,7 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
 
     private void setTargetChannelState(BookKeeper bkc, BookieSocketAddress address,
                                        long key, boolean state) throws Exception {
-        bkc.getBookieClient().lookupClient(address).obtain((rc, pcbc) -> {
+        ((BookieClientImpl) bkc.getBookieClient()).lookupClient(address).obtain((rc, pcbc) -> {
             pcbc.setWritable(state);
         }, key);
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index a91dffa..6291413 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -107,7 +108,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase {
         // try to get bookie info from the sleeping bookie. It should fail with timeout error
         BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
                 bookieToSleep.getPort());
-        BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor, scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(cConf, eventLoopGroup, executor, scheduler, NullStatsLogger.INSTANCE);
         long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
new file mode 100644
index 0000000..79cc5ba
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -0,0 +1,288 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.SafeRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mock implementation of BookieClient.
+ */
+public class MockBookieClient implements BookieClient {
+    static final Logger LOG = LoggerFactory.getLogger(MockBookieClient.class);
+
+    final OrderedExecutor executor;
+    final ConcurrentHashMap<BookieSocketAddress, ConcurrentHashMap<Long, LedgerData>> data = new ConcurrentHashMap<>();
+    final Set<BookieSocketAddress> errorBookies =
+        Collections.newSetFromMap(new ConcurrentHashMap<BookieSocketAddress, Boolean>());
+
+    final Map<BookieSocketAddress, Boolean> stalledBookies = new HashMap<>();
+    final Map<BookieSocketAddress, List<Consumer<Integer>>> stalledRequests = new HashMap<>();
+
+    public MockBookieClient(OrderedExecutor executor) {
+        this.executor = executor;
+    }
+
+    public void stallBookie(BookieSocketAddress bookie) {
+        synchronized (this) {
+            stalledBookies.put(bookie, true);
+        }
+    }
+
+    public void releaseStalledBookie(BookieSocketAddress bookie, int rc) {
+        synchronized (this) {
+            stalledBookies.remove(bookie);
+            stalledRequests.remove(bookie).forEach((r) -> r.accept(rc));
+        }
+    }
+
+    public void errorBookies(BookieSocketAddress... bookies) {
+        for (BookieSocketAddress b : bookies) {
+            errorBookies.add(b);
+        }
+    }
+
+    public void removeErrors(BookieSocketAddress... bookies) {
+        for (BookieSocketAddress b : bookies) {
+            errorBookies.remove(b);
+        }
+    }
+
+    @Override
+    public List<BookieSocketAddress> getFaultyBookies() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isWritable(BookieSocketAddress address, long ledgerId) {
+        return true;
+    }
+
+    @Override
+    public long getNumPendingRequests(BookieSocketAddress address, long ledgerId) {
+        return 0;
+    }
+
+    @Override
+    public void forceLedger(BookieSocketAddress addr, long ledgerId,
+                            ForceLedgerCallback cb, Object ctx) {
+        executor.executeOrdered(ledgerId,
+                safeRun(() -> {
+                        cb.forceLedgerComplete(BKException.Code.IllegalOpException,
+                                               ledgerId, addr, ctx);
+                    }));
+    }
+
+    @Override
+    public void writeLac(BookieSocketAddress addr, long ledgerId, byte[] masterKey,
+                         long lac, ByteBufList toSend, WriteLacCallback cb, Object ctx) {
+        executor.executeOrdered(ledgerId,
+                safeRun(() -> {
+                        cb.writeLacComplete(BKException.Code.IllegalOpException,
+                                               ledgerId, addr, ctx);
+                    }));
+    }
+
+    @Override
+    public void addEntry(BookieSocketAddress addr, long ledgerId, byte[] masterKey,
+                         long entryId, ByteBufList toSend, WriteCallback cb, Object ctx,
+                         int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags) {
+        SafeRunnable write = safeRun(() -> {
+                LOG.info("[{};L{}] write entry {}", addr, ledgerId, entryId);
+                if (errorBookies.contains(addr)) {
+                    LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, entryId);
+                    cb.writeComplete(BKException.Code.WriteException, ledgerId, entryId, addr, ctx);
+                    return;
+                }
+                LedgerData ledger = getBookieData(addr).computeIfAbsent(ledgerId, LedgerData::new);
+                ledger.addEntry(entryId, copyData(toSend));
+                cb.writeComplete(BKException.Code.OK, ledgerId, entryId, addr, ctx);
+                toSend.release();
+            });
+
+        toSend.retain();
+        synchronized (this) {
+            if (stalledBookies.getOrDefault(addr, false)) {
+                LOG.info("[{};{};{}] Stalling write {}", addr, ledgerId, System.identityHashCode(write), entryId);
+                stalledRequests.computeIfAbsent(addr, (key) -> new ArrayList<>())
+                    .add((rc) -> {
+                            LOG.info("[{};{};{}] Unstalled write {}",
+                                     addr, ledgerId, System.identityHashCode(write), entryId);
+                            if (rc == BKException.Code.OK) {
+                                executor.executeOrdered(ledgerId, write);
+                            } else {
+                                executor.executeOrdered(
+                                        ledgerId, safeRun(() -> {
+                                            cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                                            toSend.release();
+                                        }));
+                            }
+                        });
+            } else {
+                executor.executeOrdered(ledgerId, write);
+            }
+        }
+    }
+
+    @Override
+    public void readLac(BookieSocketAddress addr, long ledgerId, ReadLacCallback cb, Object ctx) {
+        executor.executeOrdered(ledgerId,
+                safeRun(() -> {
+                        cb.readLacComplete(BKException.Code.IllegalOpException,
+                                           ledgerId, null, null, ctx);
+                    }));
+    }
+
+    @Override
+    public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
+                          ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey,
+                          boolean allowFastFail) {
+        executor.executeOrdered(ledgerId,
+                safeRun(() -> {
+                        LOG.info("[{};L{}] read entry {}", addr, ledgerId, entryId);
+                        if (errorBookies.contains(addr)) {
+                            LOG.warn("[{};L{}] erroring read {}", addr, ledgerId, entryId);
+                            cb.readEntryComplete(BKException.Code.ReadException, ledgerId, entryId, null, ctx);
+                            return;
+                        }
+
+                        LedgerData ledger = getBookieData(addr).get(ledgerId);
+                        if (ledger == null) {
+                            LOG.warn("[{};L{}] ledger not found", addr, ledgerId);
+                            cb.readEntryComplete(BKException.Code.NoSuchLedgerExistsException,
+                                                 ledgerId, entryId, null, ctx);
+                            return;
+                        }
+
+                        ByteBuf entry = ledger.getEntry(entryId);
+                        if (entry == null) {
+                            LOG.warn("[{};L{}] entry({}) not found", addr, ledgerId, entryId);
+                            cb.readEntryComplete(BKException.Code.NoSuchEntryException,
+                                                 ledgerId, entryId, null, ctx);
+                            return;
+                        }
+
+                        cb.readEntryComplete(BKException.Code.OK,
+                                             ledgerId, entryId, entry.slice(), ctx);
+                    }));
+    }
+
+    @Override
+    public void readEntryWaitForLACUpdate(BookieSocketAddress addr,
+                                          long ledgerId,
+                                          long entryId,
+                                          long previousLAC,
+                                          long timeOutInMillis,
+                                          boolean piggyBackEntry,
+                                          ReadEntryCallback cb,
+                                          Object ctx) {
+        executor.executeOrdered(ledgerId,
+                safeRun(() -> {
+                        cb.readEntryComplete(BKException.Code.IllegalOpException,
+                                             ledgerId, entryId, null, ctx);
+                    }));
+    }
+
+    @Override
+    public void getBookieInfo(BookieSocketAddress addr, long requested,
+                              GetBookieInfoCallback cb, Object ctx) {
+        executor.executeOrdered(addr,
+                safeRun(() -> {
+                        cb.getBookieInfoComplete(BKException.Code.IllegalOpException,
+                                                 null, ctx);
+                    }));
+    }
+
+    @Override
+    public boolean isClosed() {
+        return false;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private ConcurrentHashMap<Long, LedgerData> getBookieData(BookieSocketAddress addr) {
+        return data.computeIfAbsent(addr, (key) -> new ConcurrentHashMap<>());
+    }
+
+    private static ByteBuf copyData(ByteBufList list) {
+        ByteBuf buf = Unpooled.buffer(list.readableBytes());
+        for (int i = 0; i < list.size(); i++) {
+            buf.writeBytes(list.getBuffer(i).slice());
+        }
+        return buf;
+    }
+
+    private static class LedgerData {
+        final long ledgerId;
+        private TreeMap<Long, ByteBuf> entries = new TreeMap<>();
+        LedgerData(long ledgerId) {
+            this.ledgerId = ledgerId;
+        }
+
+        void addEntry(long entryId, ByteBuf entry) {
+            entries.put(entryId, entry);
+        }
+
+        ByteBuf getEntry(long entryId) {
+            if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+                Map.Entry<Long, ByteBuf> lastEntry = entries.lastEntry();
+                if (lastEntry != null) {
+                    return lastEntry.getValue();
+                } else {
+                    return null;
+                }
+            } else {
+                return entries.get(entryId);
+            }
+        }
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 7ff6386..c6cc72b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -47,6 +47,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
@@ -156,8 +157,8 @@ public class BookieClientTest {
         BookieSocketAddress addr = bs.getLocalAddress();
         ResultStruct arc = new ResultStruct();
 
-        BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
-                                           scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
+                                               scheduler, NullStatsLogger.INSTANCE);
         ByteBufList bb = createByteBuffer(1, 1, 1);
         bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
         synchronized (arc) {
@@ -257,8 +258,8 @@ public class BookieClientTest {
     public void testNoLedger() throws Exception {
         ResultStruct arc = new ResultStruct();
         BookieSocketAddress addr = bs.getLocalAddress();
-        BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
-                                           scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
+                                               scheduler, NullStatsLogger.INSTANCE);
         synchronized (arc) {
             bc.readEntry(addr, 2, 13, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
@@ -269,8 +270,8 @@ public class BookieClientTest {
     @Test
     public void testGetBookieInfo() throws IOException, InterruptedException {
         BookieSocketAddress addr = bs.getLocalAddress();
-        BookieClient bc = new BookieClient(new ClientConfiguration(), new NioEventLoopGroup(), executor,
-                                           scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), new NioEventLoopGroup(), executor,
+                                               scheduler, NullStatsLogger.INSTANCE);
         long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;