You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/14 09:56:27 UTC

[GitHub] ivankelly closed pull request #1595: Make bookie client an interface

ivankelly closed pull request #1595: Make bookie client an interface
URL: https://github.com/apache/bookkeeper/pull/1595
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index e1c5979516..216aff9890 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -36,6 +36,7 @@
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -174,7 +175,7 @@ public static void main(String[] args)
                 new DefaultThreadFactory("BookKeeperClientScheduler"));
 
         ClientConfiguration conf = new ClientConfiguration();
-        BookieClient bc = new BookieClient(conf, eventLoop, executor, scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(conf, eventLoop, executor, scheduler, NullStatsLogger.INSTANCE);
         LatencyCallback lc = new LatencyCallback();
 
         ThroughputCallback tc = new ThroughputCallback();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 5b4eff8c3a..4434cfc3ae 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -100,6 +100,7 @@
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -829,7 +830,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
                     ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
                         new DefaultThreadFactory("BookKeeperClientSchedulerPool"));
 
-                    BookieClient bookieClient = new BookieClient(conf, eventLoopGroup, executor,
+                    BookieClient bookieClient = new BookieClientImpl(conf, eventLoopGroup, executor,
                         scheduler, NullStatsLogger.INSTANCE);
 
                     LongStream.range(firstEntry, lastEntry).forEach(entryId -> {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 09d2526e7b..1d909b6ad6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -74,6 +74,7 @@
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.DataFormats;
 import org.apache.bookkeeper.stats.Counter;
@@ -507,8 +508,8 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
             this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
         }
         // initialize bookie client
-        this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool,
-                                             scheduler, statsLogger);
+        this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.mainWorkerPool,
+                                                 scheduler, statsLogger);
         this.bookieWatcher = new BookieWatcherImpl(
                 conf, this.placementPolicy, metadataDriver.getRegistrationClient(),
                 this.statsLogger.scope(WATCHER_SCOPE));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index a646651a61..535584b3d8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -80,7 +80,6 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
-import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.proto.checksum.MacDigestManager;
 import org.apache.bookkeeper.stats.Counter;
@@ -99,8 +98,6 @@
 public class LedgerHandle implements WriteHandle {
     static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
 
-    static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
-
     final byte[] ledgerKey;
     private LedgerMetadata metadata;
     final BookKeeper bk;
@@ -225,14 +222,7 @@ public long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress) {
 
             @Override
             public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
-                PerChannelBookieClientPool pcbcPool = bk.bookieClient.lookupClient(bookieSocketAddress);
-                if (pcbcPool == null) {
-                    return 0;
-                } else if (pcbcPool.isWritable(ledgerId)) {
-                    return pcbcPool.getNumPendingCompletionRequests();
-                } else {
-                    return pcbcPool.getNumPendingCompletionRequests() | PENDINGREQ_NOTWRITABLE_MASK;
-                }
+                return bk.bookieClient.getNumPendingRequests(bookieSocketAddress, ledgerId);
             }
         };
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 197483d602..85a4ef9c16 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -20,579 +20,208 @@
  */
 package org.apache.bookkeeper.proto;
 
-import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.ExtensionRegistry;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
-import org.apache.bookkeeper.auth.ClientAuthProvider;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.api.WriteFlag;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.SafeRunnable;
-import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.tls.SecurityException;
-import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Implements the client-side part of the BookKeeper protocol.
- *
+ * Low level client for talking to bookies.
  */
-public class BookieClient implements PerChannelBookieClientFactory {
-    static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
-
-    // This is global state that should be across all BookieClients
-    AtomicLong totalBytesOutstanding = new AtomicLong();
-
-    OrderedExecutor executor;
-    ScheduledExecutorService scheduler;
-    ScheduledFuture<?> timeoutFuture;
-
-    EventLoopGroup eventLoopGroup;
-    final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
-            new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
-
-    private final ClientAuthProvider.Factory authProviderFactory;
-    private final ExtensionRegistry registry;
-
-    private final ClientConfiguration conf;
-    private volatile boolean closed;
-    private final ReentrantReadWriteLock closeLock;
-    private final StatsLogger statsLogger;
-    private final int numConnectionsPerBookie;
-
-    private final long bookieErrorThresholdPerInterval;
-
-    public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
-                        OrderedExecutor executor, ScheduledExecutorService scheduler,
-                        StatsLogger statsLogger) throws IOException {
-        this.conf = conf;
-        this.eventLoopGroup = eventLoopGroup;
-        this.executor = executor;
-        this.closed = false;
-        this.closeLock = new ReentrantReadWriteLock();
-
-        this.registry = ExtensionRegistry.newInstance();
-        this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf);
-
-        this.statsLogger = statsLogger;
-        this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
-        this.bookieErrorThresholdPerInterval = conf.getBookieErrorThresholdPerInterval();
-
-        this.scheduler = scheduler;
-        if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
-            SafeRunnable monitor = safeRun(() -> {
-                monitorPendingOperations();
-            });
-            this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
-                                                                    conf.getTimeoutMonitorIntervalSec(),
-                                                                    conf.getTimeoutMonitorIntervalSec(),
-                                                                    TimeUnit.SECONDS);
-        }
-    }
-
-    private int getRc(int rc) {
-        if (BKException.Code.OK == rc) {
-            return rc;
-        } else {
-            if (closed) {
-                return BKException.Code.ClientClosedException;
-            } else {
-                return rc;
-            }
-        }
-    }
-
-    public List<BookieSocketAddress> getFaultyBookies() {
-        List<BookieSocketAddress> faultyBookies = Lists.newArrayList();
-        for (PerChannelBookieClientPool channelPool : channels.values()) {
-            if (channelPool instanceof DefaultPerChannelBookieClientPool) {
-                DefaultPerChannelBookieClientPool pool = (DefaultPerChannelBookieClientPool) channelPool;
-                if (pool.errorCounter.getAndSet(0) >= bookieErrorThresholdPerInterval) {
-                    faultyBookies.add(pool.address);
-                }
-            }
-        }
-        return faultyBookies;
-    }
-
-    public boolean isWritable(BookieSocketAddress address, long key) {
-        final PerChannelBookieClientPool pcbcPool = lookupClient(address);
-        // if null, let the write initiate connect of fail with whatever error it produces
-        return pcbcPool == null || pcbcPool.isWritable(key);
-    }
-
-    @Override
-    public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
-            SecurityHandlerFactory shFactory) throws SecurityException {
-        return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLogger,
-                                          authProviderFactory, registry, pcbcPool, shFactory);
-    }
-
-    public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
-        PerChannelBookieClientPool clientPool = channels.get(addr);
-        if (null == clientPool) {
-            closeLock.readLock().lock();
-            try {
-                if (closed) {
-                    return null;
-                }
-                PerChannelBookieClientPool newClientPool =
-                    new DefaultPerChannelBookieClientPool(conf, this, addr, numConnectionsPerBookie);
-                PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
-                if (null == oldClientPool) {
-                    clientPool = newClientPool;
-                    // initialize the pool only after we put the pool into the map
-                    clientPool.intialize();
-                } else {
-                    clientPool = oldClientPool;
-                    newClientPool.close(false);
-                }
-            } catch (SecurityException e) {
-                LOG.error("Security Exception in creating new default PCBC pool: ", e);
-                return null;
-            } finally {
-                closeLock.readLock().unlock();
-            }
-        }
-        return clientPool;
-    }
-
-    public void forceLedger(final BookieSocketAddress addr, final long ledgerId,
-            final ForceLedgerCallback cb, final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.forceLedgerComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                              ledgerId, addr, ctx);
-            return;
-        }
-
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.executeOrdered(ledgerId, safeRun(() -> {
-                        cb.forceLedgerComplete(rc, ledgerId, addr, ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.forceLedgerComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
-                }
-            } else {
-                pcbc.forceLedger(ledgerId, cb, ctx);
-            }
-        }, ledgerId);
-    }
-
-    public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
-            final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                              ledgerId, addr, ctx);
-            return;
-        }
-
-        toSend.retain();
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.executeOrdered(ledgerId, safeRun(() -> {
-                        cb.writeLacComplete(rc, ledgerId, addr, ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
-                }
-            } else {
-                pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
-            }
-
-            toSend.release();
-        }, ledgerId);
-    }
-
-    private void completeAdd(final int rc,
-                             final long ledgerId,
-                             final long entryId,
-                             final BookieSocketAddress addr,
-                             final WriteCallback cb,
-                             final Object ctx) {
-        try {
-            executor.executeOrdered(ledgerId, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
-                }
-                @Override
-                public String toString() {
-                    return String.format("CompleteWrite(ledgerId=%d, entryId=%d, addr=%s)", ledgerId, entryId, addr);
-                }
-            });
-        } catch (RejectedExecutionException ree) {
-            cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx);
-        }
-    }
-
-    public void addEntry(final BookieSocketAddress addr,
-                         final long ledgerId,
-                         final byte[] masterKey,
-                         final long entryId,
-                         final ByteBufList toSend,
-                         final WriteCallback cb,
-                         final Object ctx,
-                         final int options,
-                         final boolean allowFastFail,
-                         final EnumSet<WriteFlag> writeFlags) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
-                        ledgerId, entryId, addr, cb, ctx);
-            return;
-        }
-
-        // Retain the buffer, since the connection could be obtained after
-        // the PendingApp might have already failed
-        toSend.retain();
-
-        client.obtain(ChannelReadyForAddEntryCallback.create(
-                              this, toSend, ledgerId, entryId, addr,
-                                  ctx, cb, options, masterKey, allowFastFail, writeFlags),
-                      ledgerId);
-    }
-
-    private void completeRead(final int rc,
-                              final long ledgerId,
-                              final long entryId,
-                              final ByteBuf entry,
-                              final ReadEntryCallback cb,
-                              final Object ctx) {
-        try {
-            executor.executeOrdered(ledgerId, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    cb.readEntryComplete(rc, ledgerId, entryId, entry, ctx);
-                }
-            });
-        } catch (RejectedExecutionException ree) {
-            cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
-                                 ledgerId, entryId, entry, ctx);
-        }
-    }
-
-    private static class ChannelReadyForAddEntryCallback
-        implements GenericCallback<PerChannelBookieClient> {
-        private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
-
-        private BookieClient bookieClient;
-        private ByteBufList toSend;
-        private long ledgerId;
-        private long entryId;
-        private BookieSocketAddress addr;
-        private Object ctx;
-        private WriteCallback cb;
-        private int options;
-        private byte[] masterKey;
-        private boolean allowFastFail;
-        private EnumSet<WriteFlag> writeFlags;
-
-        static ChannelReadyForAddEntryCallback create(
-                BookieClient bookieClient, ByteBufList toSend, long ledgerId,
-                long entryId, BookieSocketAddress addr, Object ctx,
-                WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
-                EnumSet<WriteFlag> writeFlags) {
-            ChannelReadyForAddEntryCallback callback = RECYCLER.get();
-            callback.bookieClient = bookieClient;
-            callback.toSend = toSend;
-            callback.ledgerId = ledgerId;
-            callback.entryId = entryId;
-            callback.addr = addr;
-            callback.ctx = ctx;
-            callback.cb = cb;
-            callback.options = options;
-            callback.masterKey = masterKey;
-            callback.allowFastFail = allowFastFail;
-            callback.writeFlags = writeFlags;
-            return callback;
-        }
-
-        @Override
-        public void operationComplete(final int rc,
-                                      PerChannelBookieClient pcbc) {
-            if (rc != BKException.Code.OK) {
-                bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
-            } else {
-                pcbc.addEntry(ledgerId, masterKey, entryId,
-                              toSend, cb, ctx, options, allowFastFail, writeFlags);
-            }
-
-            toSend.release();
-            recycle();
-        }
-
-        private ChannelReadyForAddEntryCallback(
-                Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
-        }
+public interface BookieClient {
+    long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
 
-        private static final Recycler<ChannelReadyForAddEntryCallback> RECYCLER =
-            new Recycler<ChannelReadyForAddEntryCallback>() {
-                    protected ChannelReadyForAddEntryCallback newObject(
-                            Recycler.Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
-                        return new ChannelReadyForAddEntryCallback(recyclerHandle);
-                    }
-                };
-
-        public void recycle() {
-            bookieClient = null;
-            toSend = null;
-            ledgerId = -1;
-            entryId = -1;
-            addr = null;
-            ctx = null;
-            cb = null;
-            options = -1;
-            masterKey = null;
-            allowFastFail = false;
-            writeFlags = null;
-            recyclerHandle.recycle(this);
-        }
-    }
-
-    public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb,
-            final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null,
-                    ctx);
-            return;
-        }
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.executeOrdered(ledgerId, safeRun(() -> {
-                        cb.readLacComplete(rc, ledgerId, null, null, ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.readLacComplete(getRc(BKException.Code.InterruptedException),
-                            ledgerId, null, null, ctx);
-                }
-            } else {
-                pcbc.readLac(ledgerId, cb, ctx);
-            }
-        }, ledgerId);
-    }
-
-    public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
-                          ReadEntryCallback cb, Object ctx, int flags) {
-        readEntry(addr, ledgerId, entryId, cb, ctx, flags, null);
-    }
-
-    public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
-                          final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) {
-        readEntry(addr, ledgerId, entryId, cb, ctx, flags, masterKey, false);
-    }
-
-    public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
-                          final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey,
-                          final boolean allowFastFail) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
-                                 ledgerId, entryId, null, ctx);
-            return;
-        }
+    /**
+     * Get the list of bookies which have exhibited more error responses
+     * than a configured threshold.
+     *
+     * @return the list of faulty bookies
+     */
+    List<BookieSocketAddress> getFaultyBookies();
 
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                completeRead(rc, ledgerId, entryId, null, cb, ctx);
-            } else {
-                pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey, allowFastFail);
-            }
-        }, ledgerId);
-    }
+    /**
+     * Check whether the channel used to write to a bookie channel is writable.
+     * A channel becomes non-writable when its buffer become full, and will stay
+     * non-writable until some of the buffer is cleared.
+     *
+     * <p>This can be used to apply backpressure. If a channel is not writable,
+     * requests will end up queuing.
+     *
+     * <p>As as we use pooling, there may be multiple channels per bookie, so
+     * we also pass the ledger ID to check the writability of the correct
+     * channel.
+     *
+     * <p>This has nothing to do with the bookie read-only status.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger we wish to send a request to
+     *
+     */
+    boolean isWritable(BookieSocketAddress address, long ledgerId);
 
+    /**
+     * Get the number of outstanding requests on the channel used to connect
+     * to a bookie at {@code address} for a ledger with {@code ledgerId}.
+     * It is necessary to specify the ledgerId as there may be multiple
+     * channels for a single bookie if pooling is in use.
+     * If the bookie is not {@link #isWritable(BookieSocketAddress,long) writable},
+     * then the {@link #PENDINGREQ_NOTWRITABLE_MASK} will be logically or'd with
+     * the returned value.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger whose channel we wish to query
+     * @return the number of requests currently outstanding
+     */
+    long getNumPendingRequests(BookieSocketAddress address, long ledgerId);
 
-    public void readEntryWaitForLACUpdate(final BookieSocketAddress addr,
-                                          final long ledgerId,
-                                          final long entryId,
-                                          final long previousLAC,
-                                          final long timeOutInMillis,
-                                          final boolean piggyBackEntry,
-                                          final ReadEntryCallback cb,
-                                          final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            completeRead(BKException.Code.BookieHandleNotAvailableException,
-                    ledgerId, entryId, null, cb, ctx);
-            return;
-        }
+    /**
+     * Send a force request to the server. When complete all entries which have
+     * been written for {@code ledgerId} to this bookie will be persisted on disk.
+     * This is for use with {@link org.apache.bookkeeper.client.api.WriteFlag#DEFERRED_SYNC}.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger whose entries we want persisted
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     */
+    void forceLedger(BookieSocketAddress address, long ledgerId,
+                     ForceLedgerCallback cb, Object ctx);
 
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                completeRead(rc, ledgerId, entryId, null, cb, ctx);
-            } else {
-                pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb,
-                        ctx);
-            }
-        }, ledgerId);
-    }
+    /**
+     * Read the last add confirmed for ledger {@code ledgerId} from the bookie at
+     * {@code address}.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger whose last add confirm we wish to know
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     */
+    void readLac(BookieSocketAddress address, long ledgerId, ReadLacCallback cb, Object ctx);
 
-    public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb,
-            final Object ctx) {
-        final PerChannelBookieClientPool client = lookupClient(addr);
-        if (client == null) {
-            cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(),
-                    ctx);
-            return;
-        }
-        client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.submit(safeRun(() -> {
-                        cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
-                    }));
-                } catch (RejectedExecutionException re) {
-                    cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
-                            new BookieInfo(), ctx);
-                }
-            } else {
-                pcbc.getBookieInfo(requested, cb, ctx);
-            }
-        }, requested);
-    }
+    /**
+     * Explicitly write the last add confirmed for ledger {@code ledgerId} to the bookie at
+     * {@code address}.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger whose last add confirm we wish to know
+     * @param masterKey the master key of the ledger
+     * @param lac the last add confirmed we wish to write
+     * @param toSend a buffer also containing the lac, along with a digest
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     */
+    void writeLac(BookieSocketAddress address, long ledgerId, byte[] masterKey,
+                  long lac, ByteBufList toSend, WriteLacCallback cb, Object ctx);
 
-    private void monitorPendingOperations() {
-        for (PerChannelBookieClientPool clientPool : channels.values()) {
-            clientPool.checkTimeoutOnPendingOperations();
-        }
-    }
+    /**
+     * Add an entry for ledger {@code ledgerId} on the bookie at address {@code address}.
+     *
+     * @param address the address of the bookie
+     * @param ledgerId the ledger to which we wish to add the entry
+     * @param entryId the id of the entry we wish to add
+     * @param toSend a buffer containing the entry and its digest
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     * @param options a bit mask of flags from BookieProtocol.FLAG_*
+     *                {@link org.apache.bookkeeper.proto.BookieProtocol}
+     * @param allowFastFail fail the add immediately if the channel is non-writable
+     *                      {@link #isWritable(BookieSocketAddress,long)}
+     * @param writeFlags a set of write flags
+     *                   {@link org.apache.bookkeeper.client.api.WriteFlags}
+     */
+    void addEntry(BookieSocketAddress address, long ledgerId, byte[] masterKey,
+                  long entryId, ByteBufList toSend, WriteCallback cb, Object ctx,
+                  int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags);
 
-    public boolean isClosed() {
-        return closed;
+    /**
+     * Read entry with a null masterkey, disallowing failfast.
+     * @see #readEntry(BookieSocketAddress,long,long,ReadEntryCallback,Object,int,byte[],boolean)
+     */
+    default void readEntry(BookieSocketAddress address, long ledgerId, long entryId,
+                           ReadEntryCallback cb, Object ctx, int flags) {
+        readEntry(address, ledgerId, entryId, cb, ctx, flags, null);
     }
 
-    public void close() {
-        closeLock.writeLock().lock();
-        try {
-            closed = true;
-            for (PerChannelBookieClientPool pool : channels.values()) {
-                pool.close(true);
-            }
-            channels.clear();
-            authProviderFactory.close();
-
-            if (timeoutFuture != null) {
-                timeoutFuture.cancel(false);
-            }
-        } finally {
-            closeLock.writeLock().unlock();
-        }
+    /**
+     * Read entry, disallowing failfast.
+     * @see #readEntry(BookieSocketAddress,long,long,ReadEntryCallback,Object,int,byte[],boolean)
+     */
+    default void readEntry(BookieSocketAddress address, long ledgerId, long entryId,
+                           ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey) {
+        readEntry(address, ledgerId, entryId, cb, ctx, flags, masterKey, false);
     }
 
-    private static class Counter {
-        int i;
-        int total;
-
-        synchronized void inc() {
-            i++;
-            total++;
-        }
-
-        synchronized void dec() {
-            i--;
-            notifyAll();
-        }
-
-        synchronized void wait(int limit) throws InterruptedException {
-            while (i > limit) {
-                wait();
-            }
-        }
+    /**
+     * Read an entry from bookie at address {@code address}.
+     *
+     * @param address address of the bookie to read from
+     * @param ledgerId id of the ledger the entry belongs to
+     * @param entryId id of the entry we wish to read
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     * @param flags a bit mask of flags from BookieProtocol.FLAG_*
+     *              {@link org.apache.bookkeeper.proto.BookieProtocol}
+     * @param masterKey the master key of the ledger being read from. This is only required
+     *                  if the FLAG_DO_FENCING is specified.
+     * @param allowFastFail fail the read immediately if the channel is non-writable
+     *                      {@link #isWritable(BookieSocketAddress,long)}
+     */
+    void readEntry(BookieSocketAddress address, long ledgerId, long entryId,
+                   ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey,
+                   boolean allowFastFail);
 
-        synchronized int total() {
-            return total;
-        }
-    }
+    /**
+     * Send a long poll request to bookie, waiting for the last add confirmed
+     * to be updated. The client can also request that the full entry is returned
+     * with the new last add confirmed.
+     *
+     * @param address address of bookie to send the long poll address to
+     * @param ledgerId ledger whose last add confirmed we are interested in
+     * @param entryId the id of the entry we expect to read
+     * @param previousLAC the previous lac value
+     * @param timeOutInMillis number of millis to wait for LAC update
+     * @param piggyBackEntry whether to read the requested entry when LAC is updated
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     */
+    void readEntryWaitForLACUpdate(BookieSocketAddress address,
+                                   long ledgerId,
+                                   long entryId,
+                                   long previousLAC,
+                                   long timeOutInMillis,
+                                   boolean piggyBackEntry,
+                                   ReadEntryCallback cb,
+                                   Object ctx);
 
     /**
-     * @param args
-     * @throws IOException
-     * @throws NumberFormatException
-     * @throws InterruptedException
+     * Read information about the bookie, from the bookie.
+     *
+     * @param address the address of the bookie to request information from
+     * @param requested a bitset specifying which pieces of information to request
+     *                  {@link org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest}
+     * @param cb the callback notified when the request completes
+     * @param ctx a context object passed to the callback on completion
+     *
+     * @see org.apache.bookkeeper.client.BookieInfoReader.BookieInfo
      */
-    public static void main(String[] args) throws NumberFormatException, IOException, InterruptedException {
-        if (args.length != 3) {
-            System.err.println("USAGE: BookieClient bookieHost port ledger#");
-            return;
-        }
-        WriteCallback cb = new WriteCallback() {
+    void getBookieInfo(BookieSocketAddress address, long requested,
+                       GetBookieInfoCallback cb, Object ctx);
 
-            public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress addr, Object ctx) {
-                Counter counter = (Counter) ctx;
-                counter.dec();
-                if (rc != 0) {
-                    System.out.println("rc = " + rc + " for " + entry + "@" + ledger);
-                }
-            }
-        };
-        Counter counter = new Counter();
-        byte hello[] = "hello".getBytes(UTF_8);
-        long ledger = Long.parseLong(args[2]);
-        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
-        OrderedExecutor executor = OrderedExecutor.newBuilder()
-                .name("BookieClientWorker")
-                .numThreads(1)
-                .build();
-        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
-                new DefaultThreadFactory("BookKeeperClientScheduler"));
-        BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
-                                           scheduler, NullStatsLogger.INSTANCE);
-        BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
+    /**
+     * @return whether bookie client object has been closed
+     */
+    boolean isClosed();
 
-        for (int i = 0; i < 100000; i++) {
-            counter.inc();
-            bc.addEntry(addr, ledger, new byte[0], i,
-                    ByteBufList.get(Unpooled.wrappedBuffer(hello)), cb, counter, 0, false,
-                    WriteFlag.NONE);
-        }
-        counter.wait(0);
-        System.out.println("Total = " + counter.total());
-        scheduler.shutdown();
-        eventLoopGroup.shutdownGracefully();
-        executor.shutdown();
-    }
+    /**
+     * Close the bookie client object.
+     */
+    void close();
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
new file mode 100644
index 0000000000..50dd85fd83
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -0,0 +1,611 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ExtensionRegistry;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.tls.SecurityException;
+import org.apache.bookkeeper.tls.SecurityHandlerFactory;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the client-side part of the BookKeeper protocol.
+ *
+ */
+public class BookieClientImpl implements BookieClient, PerChannelBookieClientFactory {
+    static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
+
+    OrderedExecutor executor;
+    ScheduledExecutorService scheduler;
+    ScheduledFuture<?> timeoutFuture;
+
+    EventLoopGroup eventLoopGroup;
+    final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
+            new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
+
+    private final ClientAuthProvider.Factory authProviderFactory;
+    private final ExtensionRegistry registry;
+
+    private final ClientConfiguration conf;
+    private volatile boolean closed;
+    private final ReentrantReadWriteLock closeLock;
+    private final StatsLogger statsLogger;
+    private final int numConnectionsPerBookie;
+
+    private final long bookieErrorThresholdPerInterval;
+
+    public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
+                            OrderedExecutor executor, ScheduledExecutorService scheduler,
+                            StatsLogger statsLogger) throws IOException {
+        this.conf = conf;
+        this.eventLoopGroup = eventLoopGroup;
+        this.executor = executor;
+        this.closed = false;
+        this.closeLock = new ReentrantReadWriteLock();
+
+        this.registry = ExtensionRegistry.newInstance();
+        this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf);
+
+        this.statsLogger = statsLogger;
+        this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
+        this.bookieErrorThresholdPerInterval = conf.getBookieErrorThresholdPerInterval();
+
+        this.scheduler = scheduler;
+        if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
+            SafeRunnable monitor = safeRun(() -> {
+                monitorPendingOperations();
+            });
+            this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
+                                                                    conf.getTimeoutMonitorIntervalSec(),
+                                                                    conf.getTimeoutMonitorIntervalSec(),
+                                                                    TimeUnit.SECONDS);
+        }
+    }
+
+    private int getRc(int rc) {
+        if (BKException.Code.OK == rc) {
+            return rc;
+        } else {
+            if (closed) {
+                return BKException.Code.ClientClosedException;
+            } else {
+                return rc;
+            }
+        }
+    }
+
+    @Override
+    public List<BookieSocketAddress> getFaultyBookies() {
+        List<BookieSocketAddress> faultyBookies = Lists.newArrayList();
+        for (PerChannelBookieClientPool channelPool : channels.values()) {
+            if (channelPool instanceof DefaultPerChannelBookieClientPool) {
+                DefaultPerChannelBookieClientPool pool = (DefaultPerChannelBookieClientPool) channelPool;
+                if (pool.errorCounter.getAndSet(0) >= bookieErrorThresholdPerInterval) {
+                    faultyBookies.add(pool.address);
+                }
+            }
+        }
+        return faultyBookies;
+    }
+
+    @Override
+    public boolean isWritable(BookieSocketAddress address, long key) {
+        final PerChannelBookieClientPool pcbcPool = lookupClient(address);
+        // if null, let the write initiate connect of fail with whatever error it produces
+        return pcbcPool == null || pcbcPool.isWritable(key);
+    }
+
+    @Override
+    public long getNumPendingRequests(BookieSocketAddress address, long ledgerId) {
+        PerChannelBookieClientPool pcbcPool = lookupClient(address);
+        if (pcbcPool == null) {
+            return 0;
+        } else if (pcbcPool.isWritable(ledgerId)) {
+            return pcbcPool.getNumPendingCompletionRequests();
+        } else {
+            return pcbcPool.getNumPendingCompletionRequests() | PENDINGREQ_NOTWRITABLE_MASK;
+        }
+    }
+
+    @Override
+    public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
+            SecurityHandlerFactory shFactory) throws SecurityException {
+        return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLogger,
+                                          authProviderFactory, registry, pcbcPool, shFactory);
+    }
+
+    public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
+        PerChannelBookieClientPool clientPool = channels.get(addr);
+        if (null == clientPool) {
+            closeLock.readLock().lock();
+            try {
+                if (closed) {
+                    return null;
+                }
+                PerChannelBookieClientPool newClientPool =
+                    new DefaultPerChannelBookieClientPool(conf, this, addr, numConnectionsPerBookie);
+                PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
+                if (null == oldClientPool) {
+                    clientPool = newClientPool;
+                    // initialize the pool only after we put the pool into the map
+                    clientPool.intialize();
+                } else {
+                    clientPool = oldClientPool;
+                    newClientPool.close(false);
+                }
+            } catch (SecurityException e) {
+                LOG.error("Security Exception in creating new default PCBC pool: ", e);
+                return null;
+            } finally {
+                closeLock.readLock().unlock();
+            }
+        }
+        return clientPool;
+    }
+
+    @Override
+    public void forceLedger(final BookieSocketAddress addr, final long ledgerId,
+            final ForceLedgerCallback cb, final Object ctx) {
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            cb.forceLedgerComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                              ledgerId, addr, ctx);
+            return;
+        }
+
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                try {
+                    executor.executeOrdered(ledgerId, safeRun(() -> {
+                        cb.forceLedgerComplete(rc, ledgerId, addr, ctx);
+                    }));
+                } catch (RejectedExecutionException re) {
+                    cb.forceLedgerComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
+                }
+            } else {
+                pcbc.forceLedger(ledgerId, cb, ctx);
+            }
+        }, ledgerId);
+    }
+
+    @Override
+    public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
+            final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                              ledgerId, addr, ctx);
+            return;
+        }
+
+        toSend.retain();
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                try {
+                    executor.executeOrdered(ledgerId, safeRun(() -> {
+                        cb.writeLacComplete(rc, ledgerId, addr, ctx);
+                    }));
+                } catch (RejectedExecutionException re) {
+                    cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
+                }
+            } else {
+                pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
+            }
+
+            toSend.release();
+        }, ledgerId);
+    }
+
+    private void completeAdd(final int rc,
+                             final long ledgerId,
+                             final long entryId,
+                             final BookieSocketAddress addr,
+                             final WriteCallback cb,
+                             final Object ctx) {
+        try {
+            executor.executeOrdered(ledgerId, new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                }
+                @Override
+                public String toString() {
+                    return String.format("CompleteWrite(ledgerId=%d, entryId=%d, addr=%s)", ledgerId, entryId, addr);
+                }
+            });
+        } catch (RejectedExecutionException ree) {
+            cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx);
+        }
+    }
+
+    @Override
+    public void addEntry(final BookieSocketAddress addr,
+                         final long ledgerId,
+                         final byte[] masterKey,
+                         final long entryId,
+                         final ByteBufList toSend,
+                         final WriteCallback cb,
+                         final Object ctx,
+                         final int options,
+                         final boolean allowFastFail,
+                         final EnumSet<WriteFlag> writeFlags) {
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
+                        ledgerId, entryId, addr, cb, ctx);
+            return;
+        }
+
+        // Retain the buffer, since the connection could be obtained after
+        // the PendingApp might have already failed
+        toSend.retain();
+
+        client.obtain(ChannelReadyForAddEntryCallback.create(
+                              this, toSend, ledgerId, entryId, addr,
+                                  ctx, cb, options, masterKey, allowFastFail, writeFlags),
+                      ledgerId);
+    }
+
+    private void completeRead(final int rc,
+                              final long ledgerId,
+                              final long entryId,
+                              final ByteBuf entry,
+                              final ReadEntryCallback cb,
+                              final Object ctx) {
+        try {
+            executor.executeOrdered(ledgerId, new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    cb.readEntryComplete(rc, ledgerId, entryId, entry, ctx);
+                }
+            });
+        } catch (RejectedExecutionException ree) {
+            cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
+                                 ledgerId, entryId, entry, ctx);
+        }
+    }
+
+    private static class ChannelReadyForAddEntryCallback
+        implements GenericCallback<PerChannelBookieClient> {
+        private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
+
+        private BookieClientImpl bookieClient;
+        private ByteBufList toSend;
+        private long ledgerId;
+        private long entryId;
+        private BookieSocketAddress addr;
+        private Object ctx;
+        private WriteCallback cb;
+        private int options;
+        private byte[] masterKey;
+        private boolean allowFastFail;
+        private EnumSet<WriteFlag> writeFlags;
+
+        static ChannelReadyForAddEntryCallback create(
+                BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId,
+                long entryId, BookieSocketAddress addr, Object ctx,
+                WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
+                EnumSet<WriteFlag> writeFlags) {
+            ChannelReadyForAddEntryCallback callback = RECYCLER.get();
+            callback.bookieClient = bookieClient;
+            callback.toSend = toSend;
+            callback.ledgerId = ledgerId;
+            callback.entryId = entryId;
+            callback.addr = addr;
+            callback.ctx = ctx;
+            callback.cb = cb;
+            callback.options = options;
+            callback.masterKey = masterKey;
+            callback.allowFastFail = allowFastFail;
+            callback.writeFlags = writeFlags;
+            return callback;
+        }
+
+        @Override
+        public void operationComplete(final int rc,
+                                      PerChannelBookieClient pcbc) {
+            if (rc != BKException.Code.OK) {
+                bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
+            } else {
+                pcbc.addEntry(ledgerId, masterKey, entryId,
+                              toSend, cb, ctx, options, allowFastFail, writeFlags);
+            }
+
+            toSend.release();
+            recycle();
+        }
+
+        private ChannelReadyForAddEntryCallback(
+                Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<ChannelReadyForAddEntryCallback> RECYCLER =
+            new Recycler<ChannelReadyForAddEntryCallback>() {
+                    protected ChannelReadyForAddEntryCallback newObject(
+                            Recycler.Handle<ChannelReadyForAddEntryCallback> recyclerHandle) {
+                        return new ChannelReadyForAddEntryCallback(recyclerHandle);
+                    }
+                };
+
+        public void recycle() {
+            bookieClient = null;
+            toSend = null;
+            ledgerId = -1;
+            entryId = -1;
+            addr = null;
+            ctx = null;
+            cb = null;
+            options = -1;
+            masterKey = null;
+            allowFastFail = false;
+            writeFlags = null;
+            recyclerHandle.recycle(this);
+        }
+    }
+
+    public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb,
+            final Object ctx) {
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null,
+                    ctx);
+            return;
+        }
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                try {
+                    executor.executeOrdered(ledgerId, safeRun(() -> {
+                        cb.readLacComplete(rc, ledgerId, null, null, ctx);
+                    }));
+                } catch (RejectedExecutionException re) {
+                    cb.readLacComplete(getRc(BKException.Code.InterruptedException),
+                            ledgerId, null, null, ctx);
+                }
+            } else {
+                pcbc.readLac(ledgerId, cb, ctx);
+            }
+        }, ledgerId);
+    }
+
+    public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
+                          ReadEntryCallback cb, Object ctx, int flags) {
+        readEntry(addr, ledgerId, entryId, cb, ctx, flags, null);
+    }
+
+    public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
+                          final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) {
+        readEntry(addr, ledgerId, entryId, cb, ctx, flags, masterKey, false);
+    }
+
+    public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
+                          final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey,
+                          final boolean allowFastFail) {
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                                 ledgerId, entryId, null, ctx);
+            return;
+        }
+
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                completeRead(rc, ledgerId, entryId, null, cb, ctx);
+            } else {
+                pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey, allowFastFail);
+            }
+        }, ledgerId);
+    }
+
+
+    public void readEntryWaitForLACUpdate(final BookieSocketAddress addr,
+                                          final long ledgerId,
+                                          final long entryId,
+                                          final long previousLAC,
+                                          final long timeOutInMillis,
+                                          final boolean piggyBackEntry,
+                                          final ReadEntryCallback cb,
+                                          final Object ctx) {
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            completeRead(BKException.Code.BookieHandleNotAvailableException,
+                    ledgerId, entryId, null, cb, ctx);
+            return;
+        }
+
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                completeRead(rc, ledgerId, entryId, null, cb, ctx);
+            } else {
+                pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb,
+                        ctx);
+            }
+        }, ledgerId);
+    }
+
+    public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb,
+            final Object ctx) {
+        final PerChannelBookieClientPool client = lookupClient(addr);
+        if (client == null) {
+            cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(),
+                    ctx);
+            return;
+        }
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                try {
+                    executor.submit(safeRun(() -> {
+                        cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
+                    }));
+                } catch (RejectedExecutionException re) {
+                    cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
+                            new BookieInfo(), ctx);
+                }
+            } else {
+                pcbc.getBookieInfo(requested, cb, ctx);
+            }
+        }, requested);
+    }
+
+    private void monitorPendingOperations() {
+        for (PerChannelBookieClientPool clientPool : channels.values()) {
+            clientPool.checkTimeoutOnPendingOperations();
+        }
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public void close() {
+        closeLock.writeLock().lock();
+        try {
+            closed = true;
+            for (PerChannelBookieClientPool pool : channels.values()) {
+                pool.close(true);
+            }
+            channels.clear();
+            authProviderFactory.close();
+
+            if (timeoutFuture != null) {
+                timeoutFuture.cancel(false);
+            }
+        } finally {
+            closeLock.writeLock().unlock();
+        }
+    }
+
+    private static class Counter {
+        int i;
+        int total;
+
+        synchronized void inc() {
+            i++;
+            total++;
+        }
+
+        synchronized void dec() {
+            i--;
+            notifyAll();
+        }
+
+        synchronized void wait(int limit) throws InterruptedException {
+            while (i > limit) {
+                wait();
+            }
+        }
+
+        synchronized int total() {
+            return total;
+        }
+    }
+
+    /**
+     * @param args
+     * @throws IOException
+     * @throws NumberFormatException
+     * @throws InterruptedException
+     */
+    public static void main(String[] args) throws NumberFormatException, IOException, InterruptedException {
+        if (args.length != 3) {
+            System.err.println("USAGE: BookieClient bookieHost port ledger#");
+            return;
+        }
+        WriteCallback cb = new WriteCallback() {
+
+            public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress addr, Object ctx) {
+                Counter counter = (Counter) ctx;
+                counter.dec();
+                if (rc != 0) {
+                    System.out.println("rc = " + rc + " for " + entry + "@" + ledger);
+                }
+            }
+        };
+        Counter counter = new Counter();
+        byte hello[] = "hello".getBytes(UTF_8);
+        long ledger = Long.parseLong(args[2]);
+        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
+        OrderedExecutor executor = OrderedExecutor.newBuilder()
+                .name("BookieClientWorker")
+                .numThreads(1)
+                .build();
+        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("BookKeeperClientScheduler"));
+        BookieClientImpl bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
+                                                   scheduler, NullStatsLogger.INSTANCE);
+        BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
+
+        for (int i = 0; i < 100000; i++) {
+            counter.inc();
+            bc.addEntry(addr, ledger, new byte[0], i,
+                    ByteBufList.get(Unpooled.wrappedBuffer(hello)), cb, counter, 0, false,
+                    WriteFlag.NONE);
+        }
+        counter.wait(0);
+        System.out.println("Total = " + counter.total());
+        scheduler.shutdown();
+        eventLoopGroup.shutdownGracefully();
+        executor.shutdown();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 7c6f9a0cb9..71a9b11a46 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -34,6 +34,7 @@
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
@@ -296,7 +297,7 @@ private LedgerHandle doBackpressureTest(byte[] entry, ClientConfiguration conf,
 
     private void setTargetChannelState(BookKeeper bkc, BookieSocketAddress address,
                                        long key, boolean state) throws Exception {
-        bkc.getBookieClient().lookupClient(address).obtain((rc, pcbc) -> {
+        ((BookieClientImpl) bkc.getBookieClient()).lookupClient(address).obtain((rc, pcbc) -> {
             pcbc.setWritable(state);
         }, key);
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index a91dffae3e..6291413614 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -38,6 +38,7 @@
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -107,7 +108,7 @@ public void testGetBookieInfoTimeout() throws Exception {
         // try to get bookie info from the sleeping bookie. It should fail with timeout error
         BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
                 bookieToSleep.getPort());
-        BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor, scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(cConf, eventLoopGroup, executor, scheduler, NullStatsLogger.INSTANCE);
         long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
new file mode 100644
index 0000000000..79cc5ba3f5
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -0,0 +1,288 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.SafeRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mock implementation of BookieClient.
+ */
+public class MockBookieClient implements BookieClient {
+    static final Logger LOG = LoggerFactory.getLogger(MockBookieClient.class);
+
+    final OrderedExecutor executor;
+    final ConcurrentHashMap<BookieSocketAddress, ConcurrentHashMap<Long, LedgerData>> data = new ConcurrentHashMap<>();
+    final Set<BookieSocketAddress> errorBookies =
+        Collections.newSetFromMap(new ConcurrentHashMap<BookieSocketAddress, Boolean>());
+
+    final Map<BookieSocketAddress, Boolean> stalledBookies = new HashMap<>();
+    final Map<BookieSocketAddress, List<Consumer<Integer>>> stalledRequests = new HashMap<>();
+
+    public MockBookieClient(OrderedExecutor executor) {
+        this.executor = executor;
+    }
+
+    public void stallBookie(BookieSocketAddress bookie) {
+        synchronized (this) {
+            stalledBookies.put(bookie, true);
+        }
+    }
+
+    public void releaseStalledBookie(BookieSocketAddress bookie, int rc) {
+        synchronized (this) {
+            stalledBookies.remove(bookie);
+            stalledRequests.remove(bookie).forEach((r) -> r.accept(rc));
+        }
+    }
+
+    public void errorBookies(BookieSocketAddress... bookies) {
+        for (BookieSocketAddress b : bookies) {
+            errorBookies.add(b);
+        }
+    }
+
+    public void removeErrors(BookieSocketAddress... bookies) {
+        for (BookieSocketAddress b : bookies) {
+            errorBookies.remove(b);
+        }
+    }
+
+    @Override
+    public List<BookieSocketAddress> getFaultyBookies() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isWritable(BookieSocketAddress address, long ledgerId) {
+        return true;
+    }
+
+    @Override
+    public long getNumPendingRequests(BookieSocketAddress address, long ledgerId) {
+        return 0;
+    }
+
+    @Override
+    public void forceLedger(BookieSocketAddress addr, long ledgerId,
+                            ForceLedgerCallback cb, Object ctx) {
+        executor.executeOrdered(ledgerId,
+                safeRun(() -> {
+                        cb.forceLedgerComplete(BKException.Code.IllegalOpException,
+                                               ledgerId, addr, ctx);
+                    }));
+    }
+
+    @Override
+    public void writeLac(BookieSocketAddress addr, long ledgerId, byte[] masterKey,
+                         long lac, ByteBufList toSend, WriteLacCallback cb, Object ctx) {
+        executor.executeOrdered(ledgerId,
+                safeRun(() -> {
+                        cb.writeLacComplete(BKException.Code.IllegalOpException,
+                                               ledgerId, addr, ctx);
+                    }));
+    }
+
+    @Override
+    public void addEntry(BookieSocketAddress addr, long ledgerId, byte[] masterKey,
+                         long entryId, ByteBufList toSend, WriteCallback cb, Object ctx,
+                         int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags) {
+        SafeRunnable write = safeRun(() -> {
+                LOG.info("[{};L{}] write entry {}", addr, ledgerId, entryId);
+                if (errorBookies.contains(addr)) {
+                    LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, entryId);
+                    cb.writeComplete(BKException.Code.WriteException, ledgerId, entryId, addr, ctx);
+                    return;
+                }
+                LedgerData ledger = getBookieData(addr).computeIfAbsent(ledgerId, LedgerData::new);
+                ledger.addEntry(entryId, copyData(toSend));
+                cb.writeComplete(BKException.Code.OK, ledgerId, entryId, addr, ctx);
+                toSend.release();
+            });
+
+        toSend.retain();
+        synchronized (this) {
+            if (stalledBookies.getOrDefault(addr, false)) {
+                LOG.info("[{};{};{}] Stalling write {}", addr, ledgerId, System.identityHashCode(write), entryId);
+                stalledRequests.computeIfAbsent(addr, (key) -> new ArrayList<>())
+                    .add((rc) -> {
+                            LOG.info("[{};{};{}] Unstalled write {}",
+                                     addr, ledgerId, System.identityHashCode(write), entryId);
+                            if (rc == BKException.Code.OK) {
+                                executor.executeOrdered(ledgerId, write);
+                            } else {
+                                executor.executeOrdered(
+                                        ledgerId, safeRun(() -> {
+                                            cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                                            toSend.release();
+                                        }));
+                            }
+                        });
+            } else {
+                executor.executeOrdered(ledgerId, write);
+            }
+        }
+    }
+
+    @Override
+    public void readLac(BookieSocketAddress addr, long ledgerId, ReadLacCallback cb, Object ctx) {
+        executor.executeOrdered(ledgerId,
+                safeRun(() -> {
+                        cb.readLacComplete(BKException.Code.IllegalOpException,
+                                           ledgerId, null, null, ctx);
+                    }));
+    }
+
+    @Override
+    public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
+                          ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey,
+                          boolean allowFastFail) {
+        executor.executeOrdered(ledgerId,
+                safeRun(() -> {
+                        LOG.info("[{};L{}] read entry {}", addr, ledgerId, entryId);
+                        if (errorBookies.contains(addr)) {
+                            LOG.warn("[{};L{}] erroring read {}", addr, ledgerId, entryId);
+                            cb.readEntryComplete(BKException.Code.ReadException, ledgerId, entryId, null, ctx);
+                            return;
+                        }
+
+                        LedgerData ledger = getBookieData(addr).get(ledgerId);
+                        if (ledger == null) {
+                            LOG.warn("[{};L{}] ledger not found", addr, ledgerId);
+                            cb.readEntryComplete(BKException.Code.NoSuchLedgerExistsException,
+                                                 ledgerId, entryId, null, ctx);
+                            return;
+                        }
+
+                        ByteBuf entry = ledger.getEntry(entryId);
+                        if (entry == null) {
+                            LOG.warn("[{};L{}] entry({}) not found", addr, ledgerId, entryId);
+                            cb.readEntryComplete(BKException.Code.NoSuchEntryException,
+                                                 ledgerId, entryId, null, ctx);
+                            return;
+                        }
+
+                        cb.readEntryComplete(BKException.Code.OK,
+                                             ledgerId, entryId, entry.slice(), ctx);
+                    }));
+    }
+
+    @Override
+    public void readEntryWaitForLACUpdate(BookieSocketAddress addr,
+                                          long ledgerId,
+                                          long entryId,
+                                          long previousLAC,
+                                          long timeOutInMillis,
+                                          boolean piggyBackEntry,
+                                          ReadEntryCallback cb,
+                                          Object ctx) {
+        executor.executeOrdered(ledgerId,
+                safeRun(() -> {
+                        cb.readEntryComplete(BKException.Code.IllegalOpException,
+                                             ledgerId, entryId, null, ctx);
+                    }));
+    }
+
+    @Override
+    public void getBookieInfo(BookieSocketAddress addr, long requested,
+                              GetBookieInfoCallback cb, Object ctx) {
+        executor.executeOrdered(addr,
+                safeRun(() -> {
+                        cb.getBookieInfoComplete(BKException.Code.IllegalOpException,
+                                                 null, ctx);
+                    }));
+    }
+
+    @Override
+    public boolean isClosed() {
+        return false;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private ConcurrentHashMap<Long, LedgerData> getBookieData(BookieSocketAddress addr) {
+        return data.computeIfAbsent(addr, (key) -> new ConcurrentHashMap<>());
+    }
+
+    private static ByteBuf copyData(ByteBufList list) {
+        ByteBuf buf = Unpooled.buffer(list.readableBytes());
+        for (int i = 0; i < list.size(); i++) {
+            buf.writeBytes(list.getBuffer(i).slice());
+        }
+        return buf;
+    }
+
+    private static class LedgerData {
+        final long ledgerId;
+        private TreeMap<Long, ByteBuf> entries = new TreeMap<>();
+        LedgerData(long ledgerId) {
+            this.ledgerId = ledgerId;
+        }
+
+        void addEntry(long entryId, ByteBuf entry) {
+            entries.put(entryId, entry);
+        }
+
+        ByteBuf getEntry(long entryId) {
+            if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+                Map.Entry<Long, ByteBuf> lastEntry = entries.lastEntry();
+                if (lastEntry != null) {
+                    return lastEntry.getValue();
+                } else {
+                    return null;
+                }
+            } else {
+                return entries.get(entryId);
+            }
+        }
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 7ff638675f..c6cc72bcd8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -47,6 +47,7 @@
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
@@ -156,8 +157,8 @@ public void testWriteGaps() throws Exception {
         BookieSocketAddress addr = bs.getLocalAddress();
         ResultStruct arc = new ResultStruct();
 
-        BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
-                                           scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
+                                               scheduler, NullStatsLogger.INSTANCE);
         ByteBufList bb = createByteBuffer(1, 1, 1);
         bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
         synchronized (arc) {
@@ -257,8 +258,8 @@ private ByteBufList createByteBuffer(int i, long lid, long eid) {
     public void testNoLedger() throws Exception {
         ResultStruct arc = new ResultStruct();
         BookieSocketAddress addr = bs.getLocalAddress();
-        BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor,
-                                           scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
+                                               scheduler, NullStatsLogger.INSTANCE);
         synchronized (arc) {
             bc.readEntry(addr, 2, 13, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
@@ -269,8 +270,8 @@ public void testNoLedger() throws Exception {
     @Test
     public void testGetBookieInfo() throws IOException, InterruptedException {
         BookieSocketAddress addr = bs.getLocalAddress();
-        BookieClient bc = new BookieClient(new ClientConfiguration(), new NioEventLoopGroup(), executor,
-                                           scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), new NioEventLoopGroup(), executor,
+                                               scheduler, NullStatsLogger.INSTANCE);
         long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services