You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/21 11:23:23 UTC

[GitHub] ivankelly closed pull request #1589: Make LedgerHandle injectable

ivankelly closed pull request #1589: Make LedgerHandle injectable
URL: https://github.com/apache/bookkeeper/pull/1589
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 1d909b6ad6..4fc7728bdc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -24,7 +24,6 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -60,7 +59,6 @@
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.meta.CleanupLedgerManager;
@@ -77,9 +75,7 @@
 import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.DataFormats;
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -110,24 +106,7 @@
 
     // The stats logger for this client.
     private final StatsLogger statsLogger;
-    private OpStatsLogger createOpLogger;
-    private OpStatsLogger openOpLogger;
-    private OpStatsLogger deleteOpLogger;
-    private OpStatsLogger recoverOpLogger;
-    private OpStatsLogger readOpLogger;
-    private OpStatsLogger readLacAndEntryOpLogger;
-    private OpStatsLogger readLacAndEntryRespLogger;
-    private OpStatsLogger addOpLogger;
-    private OpStatsLogger forceOpLogger;
-    private OpStatsLogger writeLacOpLogger;
-    private OpStatsLogger readLacOpLogger;
-    private OpStatsLogger recoverAddEntriesStats;
-    private OpStatsLogger recoverReadEntriesStats;
-
-    private Counter speculativeReadCounter;
-    private Counter readOpDmCounter;
-    private Counter addOpUrCounter;
-
+    private final BookKeeperClientStats clientStats;
 
     // whether the event loop group is one we created, or is owned by whoever
     // instantiated us
@@ -143,9 +122,6 @@
     final FeatureProvider featureProvider;
     final ScheduledExecutorService bookieInfoScheduler;
 
-    // Features
-    final Feature disableEnsembleChangeFeature;
-
     final MetadataClientDriver metadataDriver;
     // Ledger manager responsible for how to store ledger meta data
     final LedgerManagerFactory ledgerManagerFactory;
@@ -157,13 +133,7 @@
     BookieInfoReader bookieInfoReader;
 
     final ClientConfiguration conf;
-    final int explicitLacInterval;
-    final boolean delayEnsembleChange;
-    final boolean reorderReadSequence;
-    final long addEntryQuorumTimeoutNanos;
-
-    final Optional<SpeculativeRequestExecutionPolicy> readSpeculativeRequestPolicy;
-    final Optional<SpeculativeRequestExecutionPolicy> readLACSpeculativeRequestPolicy;
+    final ClientInternalConf internalConf;
 
     // Close State
     boolean closed = false;
@@ -410,37 +380,34 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
     BookKeeper(ClientConfiguration conf,
                        ZooKeeper zkc,
                        EventLoopGroup eventLoopGroup,
-                       StatsLogger statsLogger,
+                       StatsLogger rootStatsLogger,
                        DNSToSwitchMapping dnsResolver,
                        HashedWheelTimer requestTimer,
                        FeatureProvider featureProvider)
             throws IOException, InterruptedException, BKException {
         this.conf = conf;
-        this.delayEnsembleChange = conf.getDelayEnsembleChange();
-        this.reorderReadSequence = conf.isReorderReadSequenceEnabled();
+        // initialize feature provider
+        if (null == featureProvider) {
+            this.featureProvider = SettableFeatureProvider.DISABLE_ALL;
+        } else {
+            this.featureProvider = featureProvider;
+        }
+
+        this.internalConf = ClientInternalConf.fromConfigAndFeatureProvider(conf, this.featureProvider);
 
         // initialize resources
         this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build();
         this.mainWorkerPool = OrderedExecutor.newBuilder()
                 .name("BookKeeperClientWorker")
                 .numThreads(conf.getNumWorkerThreads())
-                .statsLogger(statsLogger)
+                .statsLogger(rootStatsLogger)
                 .traceTaskExecution(conf.getEnableTaskExecutionStats())
                 .traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
                 .build();
 
         // initialize stats logger
-        this.statsLogger = statsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE);
-        initOpLoggers(this.statsLogger);
-
-        // initialize feature provider
-        if (null == featureProvider) {
-            this.featureProvider = SettableFeatureProvider.DISABLE_ALL;
-        } else {
-            this.featureProvider = featureProvider;
-        }
-        this.disableEnsembleChangeFeature =
-            this.featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());
+        this.statsLogger = rootStatsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE);
+        this.clientStats = BookKeeperClientStats.newInstance(this.statsLogger);
 
         // initialize metadata driver
         try {
@@ -454,7 +421,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
             this.metadataDriver.initialize(
                 conf,
                 scheduler,
-                statsLogger,
+                rootStatsLogger,
                 java.util.Optional.ofNullable(zkc));
         } catch (ConfigurationException ce) {
             LOG.error("Failed to initialize metadata client driver using invalid metadata service uri", ce);
@@ -488,28 +455,9 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
         this.placementPolicy = initializeEnsemblePlacementPolicy(conf,
                 dnsResolver, this.requestTimer, this.featureProvider, this.statsLogger);
 
-        if (conf.getFirstSpeculativeReadTimeout() > 0) {
-            this.readSpeculativeRequestPolicy =
-                    Optional.of(new DefaultSpeculativeRequestExecutionPolicy(
-                        conf.getFirstSpeculativeReadTimeout(),
-                        conf.getMaxSpeculativeReadTimeout(),
-                        conf.getSpeculativeReadTimeoutBackoffMultiplier()));
-        } else {
-            this.readSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
-        }
-
-        if (conf.getFirstSpeculativeReadLACTimeout() > 0) {
-            this.readLACSpeculativeRequestPolicy =
-                    Optional.of((SpeculativeRequestExecutionPolicy) (new DefaultSpeculativeRequestExecutionPolicy(
-                        conf.getFirstSpeculativeReadLACTimeout(),
-                        conf.getMaxSpeculativeReadLACTimeout(),
-                        conf.getSpeculativeReadLACTimeoutBackoffMultiplier())));
-        } else {
-            this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
-        }
         // initialize bookie client
         this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.mainWorkerPool,
-                                                 scheduler, statsLogger);
+                                                 scheduler, rootStatsLogger);
         this.bookieWatcher = new BookieWatcherImpl(
                 conf, this.placementPolicy, metadataDriver.getRegistrationClient(),
                 this.statsLogger.scope(WATCHER_SCOPE));
@@ -537,13 +485,8 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
         }
         this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
         this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
-        this.explicitLacInterval = conf.getExplictLacInterval();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Explicit LAC Interval : {}", this.explicitLacInterval);
-        }
 
-        this.addEntryQuorumTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout());
-        scheduleBookieHealthCheckIfEnabled();
+        scheduleBookieHealthCheckIfEnabled(conf);
     }
 
     /**
@@ -551,13 +494,13 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
      */
     @VisibleForTesting
     BookKeeper() {
+        conf = new ClientConfiguration();
+        internalConf = ClientInternalConf.fromConfig(conf);
         statsLogger = NullStatsLogger.INSTANCE;
+        clientStats = BookKeeperClientStats.newInstance(statsLogger);
         scheduler = null;
         requestTimer = null;
-        reorderReadSequence = false;
         metadataDriver = null;
-        readSpeculativeRequestPolicy = Optional.absent();
-        readLACSpeculativeRequestPolicy = Optional.absent();
         placementPolicy = null;
         ownTimer = false;
         mainWorkerPool = null;
@@ -565,24 +508,10 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
         ledgerManager = null;
         ledgerIdGenerator = null;
         featureProvider = null;
-        explicitLacInterval = 0;
         eventLoopGroup = null;
-        disableEnsembleChangeFeature = null;
-        delayEnsembleChange = false;
-        conf = new ClientConfiguration();
         bookieWatcher = null;
         bookieInfoScheduler = null;
         bookieClient = null;
-        addEntryQuorumTimeoutNanos = 0;
-    }
-
-    long getAddEntryQuorumTimeoutNanos() {
-        return addEntryQuorumTimeoutNanos;
-    }
-
-
-    public int getExplicitLacInterval() {
-        return explicitLacInterval;
     }
 
     private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
@@ -601,6 +530,10 @@ private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfigur
     }
 
     int getReturnRc(int rc) {
+        return getReturnRc(bookieClient, rc);
+    }
+
+    static int getReturnRc(BookieClient bookieClient, int rc) {
         if (BKException.Code.OK == rc) {
             return rc;
         } else {
@@ -612,7 +545,7 @@ int getReturnRc(int rc) {
         }
     }
 
-    void scheduleBookieHealthCheckIfEnabled() {
+    void scheduleBookieHealthCheckIfEnabled(ClientConfiguration conf) {
         if (conf.isBookieHealthCheckEnabled()) {
             scheduler.scheduleAtFixedRate(new SafeRunnable() {
 
@@ -635,10 +568,6 @@ void checkForFaultyBookies() {
     /**
      * Returns ref to speculative read counter, needed in PendingReadOp.
      */
-    Counter getSpeculativeReadCounter() {
-        return speculativeReadCounter;
-    }
-
     @VisibleForTesting
     public LedgerManager getLedgerManager() {
         return ledgerManager;
@@ -684,11 +613,6 @@ EnsemblePlacementPolicy getPlacementPolicy() {
         return placementPolicy;
     }
 
-    @VisibleForTesting
-    boolean isReorderReadSequence() {
-        return reorderReadSequence;
-    }
-
     @VisibleForTesting
     public MetadataClientDriver getMetadataClientDriver() {
         return metadataDriver;
@@ -754,10 +678,6 @@ public static DigestType fromApiDigestType(org.apache.bookkeeper.client.api.Dige
         }
     }
 
-    boolean shouldReorderReadSequence() {
-        return reorderReadSequence;
-    }
-
     ZooKeeper getZkHandle() {
         return ((ZKMetadataClientDriver) metadataDriver).getZk();
     }
@@ -770,23 +690,6 @@ StatsLogger getStatsLogger() {
         return statsLogger;
     }
 
-    public Optional<SpeculativeRequestExecutionPolicy> getReadSpeculativeRequestPolicy() {
-        return readSpeculativeRequestPolicy;
-    }
-
-    public Optional<SpeculativeRequestExecutionPolicy> getReadLACSpeculativeRequestPolicy() {
-        return readLACSpeculativeRequestPolicy;
-    }
-
-    /**
-     * Get the disableEnsembleChangeFeature.
-     *
-     * @return disableEnsembleChangeFeature for the BookKeeper instance.
-     */
-    Feature getDisableEnsembleChangeFeature() {
-        return disableEnsembleChangeFeature;
-    }
-
     /**
      * Get the BookieClient, currently used for doing bookie recovery.
      *
@@ -887,7 +790,7 @@ public void asyncCreateLedger(final int ensSize, final int writeQuorumSize, fina
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
                                ackQuorumSize, digestType, passwd, cb, ctx,
-                               customMetadata, WriteFlag.NONE, getStatsLogger())
+                               customMetadata, WriteFlag.NONE, clientStats)
                 .initiate();
         } finally {
             closeLock.readLock().unlock();
@@ -1085,7 +988,7 @@ public void asyncCreateLedgerAdv(final int ensSize, final int writeQuorumSize, f
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
                                ackQuorumSize, digestType, passwd, cb, ctx,
-                               customMetadata, WriteFlag.NONE, getStatsLogger())
+                               customMetadata, WriteFlag.NONE, clientStats)
                                        .initiateAdv(-1L);
         } finally {
             closeLock.readLock().unlock();
@@ -1193,7 +1096,7 @@ public void asyncCreateLedgerAdv(final long ledgerId,
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
                                ackQuorumSize, digestType, passwd, cb, ctx,
-                               customMetadata, WriteFlag.NONE, getStatsLogger())
+                               customMetadata, WriteFlag.NONE, clientStats)
                     .initiateAdv(ledgerId);
         } finally {
             closeLock.readLock().unlock();
@@ -1234,7 +1137,8 @@ public void asyncOpenLedger(final long lId, final DigestType digestType, final b
                 cb.openComplete(BKException.Code.ClientClosedException, null, ctx);
                 return;
             }
-            new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
+            new LedgerOpenOp(BookKeeper.this, clientStats,
+                             lId, digestType, passwd, cb, ctx).initiate();
         } finally {
             closeLock.readLock().unlock();
         }
@@ -1278,7 +1182,8 @@ public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestTyp
                 cb.openComplete(BKException.Code.ClientClosedException, null, ctx);
                 return;
             }
-            new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
+            new LedgerOpenOp(BookKeeper.this, clientStats,
+                             lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
         } finally {
             closeLock.readLock().unlock();
         }
@@ -1357,7 +1262,7 @@ public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Obj
                 cb.deleteComplete(BKException.Code.ClientClosedException, ctx);
                 return;
             }
-            new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
+            new LedgerDeleteOp(BookKeeper.this, clientStats, lId, cb, ctx).initiate();
         } finally {
             closeLock.readLock().unlock();
         }
@@ -1498,72 +1403,6 @@ public void close() throws BKException, InterruptedException {
         this.metadataDriver.close();
     }
 
-    private void initOpLoggers(StatsLogger stats) {
-        createOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.CREATE_OP);
-        deleteOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.DELETE_OP);
-        openOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.OPEN_OP);
-        recoverOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.RECOVER_OP);
-        readOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_OP);
-        readOpDmCounter = stats.getCounter(BookKeeperClientStats.READ_OP_DM);
-        readLacAndEntryOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY);
-        readLacAndEntryRespLogger = stats.getOpStatsLogger(
-                BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE);
-        addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
-        forceOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.FORCE_OP);
-        addOpUrCounter = stats.getCounter(BookKeeperClientStats.ADD_OP_UR);
-        writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
-        readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
-        recoverAddEntriesStats = stats.getOpStatsLogger(BookKeeperClientStats.LEDGER_RECOVER_ADD_ENTRIES);
-        recoverReadEntriesStats = stats.getOpStatsLogger(BookKeeperClientStats.LEDGER_RECOVER_READ_ENTRIES);
-
-        speculativeReadCounter = stats.getCounter(BookKeeperClientStats.SPECULATIVE_READ_COUNT);
-    }
-
-    OpStatsLogger getCreateOpLogger() {
-        return createOpLogger;
-    }
-    OpStatsLogger getOpenOpLogger() {
-        return openOpLogger;
-    }
-    OpStatsLogger getDeleteOpLogger() {
-        return deleteOpLogger;
-    }
-    OpStatsLogger getRecoverOpLogger() {
-        return recoverOpLogger;
-    }
-    OpStatsLogger getReadOpLogger() {
-        return readOpLogger;
-    }
-    OpStatsLogger getReadLacAndEntryOpLogger() {
-        return readLacAndEntryOpLogger;
-    }
-    OpStatsLogger getReadLacAndEntryRespLogger() {
-        return readLacAndEntryRespLogger;
-    }
-    OpStatsLogger getAddOpLogger() {
-        return addOpLogger;
-    }
-    OpStatsLogger getForceOpLogger() {
-        return forceOpLogger;
-    }
-    OpStatsLogger getWriteLacOpLogger() {
-        return writeLacOpLogger;
-    }
-    OpStatsLogger getReadLacOpLogger() {
-        return readLacOpLogger;
-    }
-    OpStatsLogger getRecoverAddCountLogger() {
-        return recoverAddEntriesStats;
-    }
-    OpStatsLogger getRecoverReadCountLogger() {
-        return recoverReadEntriesStats;
-    }
-    Counter getReadOpDmCounter() {
-        return readOpDmCounter;
-    }
-    Counter getAddOpUrCounter() {
-        return addOpUrCounter;
-    }
     static EventLoopGroup getDefaultEventLoopGroup() {
         ThreadFactory threadFactory = new DefaultThreadFactory("bookkeeper-io");
         final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
@@ -1595,4 +1434,54 @@ public DeleteBuilder newDeleteLedgerOp() {
         return new LedgerDeleteOp.DeleteBuilderImpl(this);
     }
 
+    private final ClientContext clientCtx = new ClientContext() {
+            @Override
+            public ClientInternalConf getConf() {
+                return internalConf;
+            }
+
+            @Override
+            public LedgerManager getLedgerManager() {
+                return BookKeeper.this.getLedgerManager();
+            }
+
+            @Override
+            public BookieWatcher getBookieWatcher() {
+                return BookKeeper.this.getBookieWatcher();
+            }
+
+            @Override
+            public EnsemblePlacementPolicy getPlacementPolicy() {
+                return BookKeeper.this.getPlacementPolicy();
+            }
+
+            @Override
+            public BookieClient getBookieClient() {
+                return BookKeeper.this.getBookieClient();
+            }
+
+            @Override
+            public OrderedExecutor getMainWorkerPool() {
+                return BookKeeper.this.getMainWorkerPool();
+            }
+
+            @Override
+            public OrderedScheduler getScheduler() {
+                return BookKeeper.this.getScheduler();
+            }
+
+            @Override
+            public BookKeeperClientStats getClientStats() {
+                return clientStats;
+            }
+
+            @Override
+            public boolean isClientClosed() {
+                return BookKeeper.this.isClosed();
+            }
+        };
+
+    ClientContext getClientCtx() {
+        return clientCtx;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 958dc779ba..59c203ba0f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -267,7 +267,7 @@ public void watchReadOnlyBookiesChanged(final RegistrationListener listener)
      * @see BookKeeper#asyncOpenLedger
      */
     public void asyncOpenLedger(final long lId, final OpenCallback cb, final Object ctx) {
-        new LedgerOpenOp(bkc, lId, cb, ctx).initiate();
+        new LedgerOpenOp(bkc, bkc.getClientCtx().getClientStats(), lId, cb, ctx).initiate();
     }
 
     /**
@@ -284,7 +284,7 @@ public LedgerHandle openLedger(final long lId) throws InterruptedException,
         CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         SyncOpenCallback result = new SyncOpenCallback(future);
 
-        new LedgerOpenOp(bkc, lId, result, null).initiate();
+        new LedgerOpenOp(bkc, bkc.getClientCtx().getClientStats(), lId, result, null).initiate();
 
         return SyncCallbackUtils.waitForResult(future);
     }
@@ -304,7 +304,7 @@ public LedgerHandle openLedger(final long lId) throws InterruptedException,
      * @see BookKeeper#asyncOpenLedgerNoRecovery
      */
     public void asyncOpenLedgerNoRecovery(final long lId, final OpenCallback cb, final Object ctx) {
-        new LedgerOpenOp(bkc, lId, cb, ctx).initiateWithoutRecovery();
+        new LedgerOpenOp(bkc, bkc.getClientCtx().getClientStats(), lId, cb, ctx).initiateWithoutRecovery();
     }
 
     /**
@@ -322,7 +322,7 @@ public LedgerHandle openLedgerNoRecovery(final long lId)
         CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         SyncOpenCallback result = new SyncOpenCallback(future);
 
-        new LedgerOpenOp(bkc, lId, result, null)
+        new LedgerOpenOp(bkc, bkc.getClientCtx().getClientStats(), lId, result, null)
                 .initiateWithoutRecovery();
 
         return SyncCallbackUtils.waitForResult(future);
@@ -893,6 +893,7 @@ public void processResult(int rc, String path, Object ctx) {
                         try {
                             LedgerFragmentReplicator.SingleFragmentCallback cb =
                                 new LedgerFragmentReplicator.SingleFragmentCallback(ledgerFragmentsMcb, lh,
+                                                                                    bkc.getMainWorkerPool(),
                                         startEntryId, getReplacementBookiesMap(ensemble, targetBookieAddresses));
                             LedgerFragment ledgerFragment = new LedgerFragment(lh,
                                 startEntryId, endEntryId, targetBookieAddresses.keySet());
@@ -1046,6 +1047,7 @@ private void replicateLedgerFragment(LedgerHandle lh,
         SingleFragmentCallback cb = new SingleFragmentCallback(
             resultCallBack,
             lh,
+            bkc.getMainWorkerPool(),
             ledgerFragment.getFirstEntryId(),
             getReplacementBookiesMap(ledgerFragment, targetBookieAddresses));
 
@@ -1423,7 +1425,7 @@ public void triggerAudit()
         }
 
         BookieSocketAddress auditorId =
-            AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.conf), bkc.getZkHandle());
+            AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.getConf()), bkc.getZkHandle());
         if (auditorId == null) {
             LOG.error("No auditor elected, though Autorecovery is enabled. So giving up.");
             throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up.");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 30d14f0daa..2fa6753b10 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -21,6 +21,11 @@
 
 package org.apache.bookkeeper.client;
 
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
 /**
  * List of constants for defining client stats names.
  */
@@ -84,4 +89,144 @@
     String ACTIVE_TLS_CHANNEL_COUNTER = "ACTIVE_TLS_CHANNEL_COUNTER";
     String FAILED_CONNECTION_COUNTER = "FAILED_CONNECTION_COUNTER";
     String FAILED_TLS_HANDSHAKE_COUNTER = "FAILED_TLS_HANDSHAKE_COUNTER";
+
+    OpStatsLogger getCreateOpLogger();
+    OpStatsLogger getOpenOpLogger();
+    OpStatsLogger getDeleteOpLogger();
+    OpStatsLogger getRecoverOpLogger();
+    OpStatsLogger getReadOpLogger();
+    OpStatsLogger getReadLacAndEntryOpLogger();
+    OpStatsLogger getReadLacAndEntryRespLogger();
+    OpStatsLogger getAddOpLogger();
+    OpStatsLogger getForceOpLogger();
+    OpStatsLogger getWriteLacOpLogger();
+    OpStatsLogger getReadLacOpLogger();
+    OpStatsLogger getRecoverAddCountLogger();
+    OpStatsLogger getRecoverReadCountLogger();
+    Counter getReadOpDmCounter();
+    Counter getAddOpUrCounter();
+    Counter getSpeculativeReadCounter();
+    Counter getEnsembleBookieDistributionCounter(String bookie);
+    Counter getEnsembleChangeCounter();
+    Counter getLacUpdateHitsCounter();
+    Counter getLacUpdateMissesCounter();
+    OpStatsLogger getClientChannelWriteWaitLogger();
+    void registerPendingAddsGauge(Gauge<Integer> gauge);
+
+    static BookKeeperClientStats newInstance(StatsLogger stats) {
+        OpStatsLogger createOpLogger = stats.getOpStatsLogger(CREATE_OP);
+        OpStatsLogger deleteOpLogger = stats.getOpStatsLogger(DELETE_OP);
+        OpStatsLogger openOpLogger = stats.getOpStatsLogger(OPEN_OP);
+        OpStatsLogger recoverOpLogger = stats.getOpStatsLogger(RECOVER_OP);
+        OpStatsLogger readOpLogger = stats.getOpStatsLogger(READ_OP);
+        Counter readOpDmCounter = stats.getCounter(READ_OP_DM);
+        OpStatsLogger readLacAndEntryOpLogger = stats.getOpStatsLogger(READ_LAST_CONFIRMED_AND_ENTRY);
+        OpStatsLogger readLacAndEntryRespLogger = stats.getOpStatsLogger(READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE);
+        OpStatsLogger addOpLogger = stats.getOpStatsLogger(ADD_OP);
+        OpStatsLogger forceOpLogger = stats.getOpStatsLogger(FORCE_OP);
+        Counter addOpUrCounter = stats.getCounter(ADD_OP_UR);
+        OpStatsLogger writeLacOpLogger = stats.getOpStatsLogger(WRITE_LAC_OP);
+        OpStatsLogger readLacOpLogger = stats.getOpStatsLogger(READ_LAC_OP);
+        OpStatsLogger recoverAddEntriesStats = stats.getOpStatsLogger(LEDGER_RECOVER_ADD_ENTRIES);
+        OpStatsLogger recoverReadEntriesStats = stats.getOpStatsLogger(LEDGER_RECOVER_READ_ENTRIES);
+
+        Counter ensembleChangeCounter = stats.getCounter(ENSEMBLE_CHANGES);
+        Counter lacUpdateHitsCounter = stats.getCounter(LAC_UPDATE_HITS);
+        Counter lacUpdateMissesCounter = stats.getCounter(LAC_UPDATE_MISSES);
+        OpStatsLogger clientChannelWriteWaitStats = stats.getOpStatsLogger(CLIENT_CHANNEL_WRITE_WAIT);
+
+        Counter speculativeReadCounter = stats.getCounter(SPECULATIVE_READ_COUNT);
+
+        return new BookKeeperClientStats() {
+            @Override
+            public OpStatsLogger getCreateOpLogger() {
+                return createOpLogger;
+            }
+            @Override
+            public OpStatsLogger getOpenOpLogger() {
+                return openOpLogger;
+            }
+            @Override
+            public OpStatsLogger getDeleteOpLogger() {
+                return deleteOpLogger;
+            }
+            @Override
+            public OpStatsLogger getRecoverOpLogger() {
+                return recoverOpLogger;
+            }
+            @Override
+            public OpStatsLogger getReadOpLogger() {
+                return readOpLogger;
+            }
+            @Override
+            public OpStatsLogger getReadLacAndEntryOpLogger() {
+                return readLacAndEntryOpLogger;
+            }
+            @Override
+            public OpStatsLogger getReadLacAndEntryRespLogger() {
+                return readLacAndEntryRespLogger;
+            }
+            @Override
+            public OpStatsLogger getAddOpLogger() {
+                return addOpLogger;
+            }
+            @Override
+            public OpStatsLogger getForceOpLogger() {
+                return forceOpLogger;
+            }
+            @Override
+            public OpStatsLogger getWriteLacOpLogger() {
+                return writeLacOpLogger;
+            }
+            @Override
+            public OpStatsLogger getReadLacOpLogger() {
+                return readLacOpLogger;
+            }
+            @Override
+            public OpStatsLogger getRecoverAddCountLogger() {
+                return recoverAddEntriesStats;
+            }
+            @Override
+            public OpStatsLogger getRecoverReadCountLogger() {
+                return recoverReadEntriesStats;
+            }
+            @Override
+            public Counter getReadOpDmCounter() {
+                return readOpDmCounter;
+            }
+            @Override
+            public Counter getAddOpUrCounter() {
+                return addOpUrCounter;
+            }
+            @Override
+            public Counter getSpeculativeReadCounter() {
+                return speculativeReadCounter;
+            }
+            @Override
+            public Counter getEnsembleChangeCounter() {
+                return ensembleChangeCounter;
+            }
+            @Override
+            public Counter getLacUpdateHitsCounter() {
+                return lacUpdateHitsCounter;
+            }
+            @Override
+            public Counter getLacUpdateMissesCounter() {
+                return lacUpdateMissesCounter;
+            }
+            @Override
+            public OpStatsLogger getClientChannelWriteWaitLogger() {
+                return clientChannelWriteWaitStats;
+            }
+            @Override
+            public Counter getEnsembleBookieDistributionCounter(String bookie) {
+                return stats.getCounter(LEDGER_ENSEMBLE_BOOKIE_DISTRIBUTION + "-" + bookie);
+            }
+            @Override
+            public void registerPendingAddsGauge(Gauge<Integer> gauge) {
+                stats.registerGauge(PENDING_ADDS, gauge);
+            }
+        };
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
new file mode 100644
index 0000000000..d8803d0ea6
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookieClient;
+
+/**
+ * Collection of client objects used by LedgerHandle to interact with
+ * the outside world. Normally these are instantiated by the BookKeeper object
+ * but they are present to the LedgerHandle through this interface to allow
+ * tests to easily inject mocked versions.
+ */
+interface ClientContext {
+    ClientInternalConf getConf();
+    LedgerManager getLedgerManager();
+    BookieWatcher getBookieWatcher();
+    EnsemblePlacementPolicy getPlacementPolicy();
+    BookieClient getBookieClient();
+    OrderedExecutor getMainWorkerPool();
+    OrderedScheduler getScheduler();
+    BookKeeperClientStats getClientStats();
+    boolean isClientClosed();
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
new file mode 100644
index 0000000000..ac56a1fbfb
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.base.Optional;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.feature.SettableFeatureProvider;
+
+class ClientInternalConf {
+    final Feature disableEnsembleChangeFeature;
+    final boolean delayEnsembleChange;
+
+    final Optional<SpeculativeRequestExecutionPolicy> readSpeculativeRequestPolicy;
+    final Optional<SpeculativeRequestExecutionPolicy> readLACSpeculativeRequestPolicy;
+
+    final int explicitLacInterval;
+    final long waitForWriteSetMs;
+    final long addEntryQuorumTimeoutNanos;
+    final boolean enableParallelRecoveryRead;
+    final boolean enableReorderReadSequence;
+    final int recoveryReadBatchSize;
+    final int throttleValue;
+    final int bookieFailureHistoryExpirationMSec;
+    final int maxAllowedEnsembleChanges;
+    final long timeoutMonitorIntervalSec;
+    final boolean enableBookieFailureTracking;
+    final boolean useV2WireProtocol;
+
+    static ClientInternalConf defaultValues() {
+        return fromConfig(new ClientConfiguration());
+    }
+
+    static ClientInternalConf fromConfig(ClientConfiguration conf) {
+        return fromConfigAndFeatureProvider(conf, SettableFeatureProvider.DISABLE_ALL);
+    }
+
+    static ClientInternalConf fromConfigAndFeatureProvider(ClientConfiguration conf,
+                                                           FeatureProvider featureProvider) {
+        return new ClientInternalConf(conf, featureProvider);
+    }
+
+    private ClientInternalConf(ClientConfiguration conf,
+                               FeatureProvider featureProvider) {
+        this.explicitLacInterval = conf.getExplictLacInterval();
+        this.enableReorderReadSequence = conf.isReorderReadSequenceEnabled();
+        this.enableParallelRecoveryRead = conf.getEnableParallelRecoveryRead();
+        this.recoveryReadBatchSize = conf.getRecoveryReadBatchSize();
+        this.waitForWriteSetMs = conf.getWaitTimeoutOnBackpressureMillis();
+        this.addEntryQuorumTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout());
+        this.throttleValue = conf.getThrottleValue();
+        this.bookieFailureHistoryExpirationMSec = conf.getBookieFailureHistoryExpirationMSec();
+
+        this.disableEnsembleChangeFeature = featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());
+
+        this.delayEnsembleChange = conf.getDelayEnsembleChange();
+        this.maxAllowedEnsembleChanges = conf.getMaxAllowedEnsembleChanges();
+        this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec();
+        this.enableBookieFailureTracking = conf.getEnableBookieFailureTracking();
+        this.useV2WireProtocol = conf.getUseV2WireProtocol();
+
+        if (conf.getFirstSpeculativeReadTimeout() > 0) {
+            this.readSpeculativeRequestPolicy =
+                    Optional.of(new DefaultSpeculativeRequestExecutionPolicy(
+                                        conf.getFirstSpeculativeReadTimeout(),
+                                        conf.getMaxSpeculativeReadTimeout(),
+                                        conf.getSpeculativeReadTimeoutBackoffMultiplier()));
+        } else {
+            this.readSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
+        }
+        if (conf.getFirstSpeculativeReadLACTimeout() > 0) {
+            this.readLACSpeculativeRequestPolicy =
+                    Optional.of(new DefaultSpeculativeRequestExecutionPolicy(
+                        conf.getFirstSpeculativeReadLACTimeout(),
+                        conf.getMaxSpeculativeReadLACTimeout(),
+                        conf.getSpeculativeReadLACTimeoutBackoffMultiplier()));
+        } else {
+            this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index 96c8998db4..e2b3801896 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -53,10 +53,15 @@ public void updatePiggyBackedLac(long piggyBackedLac) {
         volatile long piggyBackedLac = LedgerHandle.INVALID_ENTRY_ID;
         volatile long explicitLac = LedgerHandle.INVALID_ENTRY_ID;
         final LedgerHandle lh;
+        final ClientContext clientCtx;
+
         ScheduledFuture<?> scheduledFuture;
 
-        ExplicitLacFlushPolicyImpl(LedgerHandle lh) {
+        ExplicitLacFlushPolicyImpl(LedgerHandle lh,
+                                   ClientContext clientCtx) {
             this.lh = lh;
+            this.clientCtx = clientCtx;
+
             scheduleExplictLacFlush();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Scheduled Explicit Last Add Confirmed Update");
@@ -80,7 +85,6 @@ public void setPiggyBackedLac(long piggyBackedLac) {
         }
 
         private void scheduleExplictLacFlush() {
-            int explicitLacIntervalInMs = lh.bk.getExplicitLacInterval();
             final SafeRunnable updateLacTask = new SafeRunnable() {
                 @Override
                 public void safeRun() {
@@ -116,7 +120,8 @@ public String toString() {
                 }
             };
             try {
-                scheduledFuture = lh.bk.getScheduler().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
+                long explicitLacIntervalInMs = clientCtx.getConf().explicitLacInterval;
+                scheduledFuture = clientCtx.getScheduler().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
                         explicitLacIntervalInMs, explicitLacIntervalInMs, TimeUnit.MILLISECONDS);
             } catch (RejectedExecutionException re) {
                 LOG.error("Scheduling of ExplictLastAddConfirmedFlush for ledger: {} has failed because of {}",
@@ -129,13 +134,13 @@ public String toString() {
          */
         void asyncExplicitLacFlush(final long explicitLac) {
             final LastAddConfirmedCallback cb = LastAddConfirmedCallback.INSTANCE;
-            final PendingWriteLacOp op = new PendingWriteLacOp(lh, cb, null);
+            final PendingWriteLacOp op = new PendingWriteLacOp(lh, clientCtx, cb, null);
             op.setLac(explicitLac);
             try {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Sending Explicit LAC: {}", explicitLac);
                 }
-                lh.bk.getMainWorkerPool().submit(new SafeRunnable() {
+                clientCtx.getMainWorkerPool().submit(new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         ByteBufList toSend = lh.macManager
@@ -144,7 +149,9 @@ public void safeRun() {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                cb.addLacComplete(lh.bk.getReturnRc(BKException.Code.InterruptedException), lh, null);
+                cb.addLacComplete(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
+                                                         BKException.Code.InterruptedException),
+                                  lh, null);
             }
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
index a48a58bfac..f87bfe33f8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
@@ -44,14 +45,17 @@
     long currentNonDurableLastAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
 
     final LedgerHandle lh;
+    final BookieClient bookieClient;
 
-    ForceLedgerOp(LedgerHandle lh, CompletableFuture<Void> cb) {
+    ForceLedgerOp(LedgerHandle lh, BookieClient bookieClient,
+                  CompletableFuture<Void> cb) {
         this.lh = lh;
+        this.bookieClient = bookieClient;
         this.cb = cb;
     }
 
     void sendForceLedgerRequest(int bookieIndex) {
-        lh.bk.getBookieClient().forceLedger(currentEnsemble.get(bookieIndex), lh.ledgerId, this, bookieIndex);
+        bookieClient.forceLedger(currentEnsemble.get(bookieIndex), lh.ledgerId, this, bookieIndex);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 4f8beed1b4..e39b2a19b0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -45,7 +45,6 @@
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,9 +68,9 @@
     final EnumSet<WriteFlag> writeFlags;
     final long startTime;
     final OpStatsLogger createOpLogger;
+    final BookKeeperClientStats clientStats;
     boolean adv = false;
     boolean generateLedgerId = true;
-    private final StatsLogger statsLogger;
 
     /**
      * Constructor.
@@ -100,7 +99,7 @@
             BookKeeper bk, int ensembleSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType,
             byte[] passwd, CreateCallback cb, Object ctx, final Map<String, byte[]> customMetadata,
             EnumSet<WriteFlag> writeFlags,
-            StatsLogger statsLogger) {
+            BookKeeperClientStats clientStats) {
         this.bk = bk;
         this.metadata = new LedgerMetadata(
             ensembleSize,
@@ -116,8 +115,8 @@
         this.cb = cb;
         this.ctx = ctx;
         this.startTime = MathUtils.nowInNano();
-        this.createOpLogger = bk.getCreateOpLogger();
-        this.statsLogger = statsLogger;
+        this.createOpLogger = clientStats.getCreateOpLogger();
+        this.clientStats = clientStats;
     }
 
     /**
@@ -200,11 +199,9 @@ public void operationComplete(int rc, LedgerMetadata writtenMetadata) {
 
         try {
             if (adv) {
-                lh = new LedgerHandleAdv(bk, ledgerId, metadata, digestType,
-                        passwd, writeFlags);
+                lh = new LedgerHandleAdv(bk.getClientCtx(), ledgerId, metadata, digestType, passwd, writeFlags);
             } else {
-                lh = new LedgerHandle(bk, ledgerId, metadata, digestType,
-                        passwd, writeFlags);
+                lh = new LedgerHandle(bk.getClientCtx(), ledgerId, metadata, digestType, passwd, writeFlags);
             }
         } catch (GeneralSecurityException e) {
             LOG.error("Security exception while creating ledger: " + ledgerId, e);
@@ -220,8 +217,7 @@ public void operationComplete(int rc, LedgerMetadata writtenMetadata) {
         LOG.info("Ensemble: {} for ledger: {}", curEns, lh.getId());
 
         for (BookieSocketAddress bsa : curEns) {
-            String ensSpread = BookKeeperClientStats.LEDGER_ENSEMBLE_BOOKIE_DISTRIBUTION + "-" + bsa;
-            statsLogger.getCounter(ensSpread).inc();
+            clientStats.getEnsembleBookieDistributionCounter(bsa.toString()).inc();
         }
 
         // return the ledger handle back
@@ -357,7 +353,8 @@ private void create(CreateCallback cb) {
             }
             LedgerCreateOp op = new LedgerCreateOp(bk, builderEnsembleSize,
                 builderWriteQuorumSize, builderAckQuorumSize, DigestType.fromApiDigestType(builderDigestType),
-                builderPassword, cb, null, builderCustomMetadata, builderWriteFlags, bk.getStatsLogger());
+                builderPassword, cb, null, builderCustomMetadata, builderWriteFlags,
+                bk.getClientCtx().getClientStats());
             ReentrantReadWriteLock closeLock = bk.getCloseLock();
             closeLock.readLock().lock();
             try {
@@ -417,7 +414,7 @@ private void create(CreateCallback cb) {
                     DigestType.fromApiDigestType(parent.builderDigestType),
                     parent.builderPassword, cb, null, parent.builderCustomMetadata,
                     parent.builderWriteFlags,
-                    parent.bk.getStatsLogger());
+                    parent.bk.getClientCtx().getClientStats());
             ReentrantReadWriteLock closeLock = parent.bk.getCloseLock();
             closeLock.readLock().lock();
             try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
index 45ec4255a9..f2461d5089 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
@@ -62,14 +62,15 @@
      * @param ctx
      *            optional control object
      */
-    LedgerDeleteOp(BookKeeper bk, long ledgerId, DeleteCallback cb, Object ctx) {
+    LedgerDeleteOp(BookKeeper bk, BookKeeperClientStats clientStats,
+                   long ledgerId, DeleteCallback cb, Object ctx) {
         super(bk.getMainWorkerPool(), ledgerId);
         this.bk = bk;
         this.ledgerId = ledgerId;
         this.cb = cb;
         this.ctx = ctx;
         this.startTime = MathUtils.nowInNano();
-        this.deleteOpLogger = bk.getDeleteOpLogger();
+        this.deleteOpLogger = clientStats.getDeleteOpLogger();
     }
 
     /**
@@ -135,7 +136,7 @@ private void delete(Long ledgerId, AsyncCallback.DeleteCallback cb) {
                 cb.deleteComplete(BKException.Code.IncorrectParameterException, null);
                 return;
             }
-            LedgerDeleteOp op = new LedgerDeleteOp(bk, ledgerId, cb, null);
+            LedgerDeleteOp op = new LedgerDeleteOp(bk, bk.getClientCtx().getClientStats(), ledgerId, cb, null);
             ReentrantReadWriteLock closeLock = bk.getCloseLock();
             closeLock.readLock().lock();
             try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 64394def2b..eb11b30c5a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -36,6 +36,7 @@
 
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -340,14 +341,16 @@ public void readComplete(int rc, LedgerHandle lh,
     static class SingleFragmentCallback implements AsyncCallback.VoidCallback {
         final AsyncCallback.VoidCallback ledgerFragmentsMcb;
         final LedgerHandle lh;
+        final OrderedExecutor mainWorkerPool;
         final long fragmentStartId;
         final Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie;
 
         SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
-                LedgerHandle lh, long fragmentStartId,
+                               LedgerHandle lh, OrderedExecutor mainWorkerPool, long fragmentStartId,
                 Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) {
             this.ledgerFragmentsMcb = ledgerFragmentsMcb;
             this.lh = lh;
+            this.mainWorkerPool = mainWorkerPool;
             this.fragmentStartId = fragmentStartId;
             this.oldBookie2NewBookie = oldBookie2NewBookie;
         }
@@ -360,7 +363,7 @@ public void processResult(int rc, String path, Object ctx) {
                 ledgerFragmentsMcb.processResult(rc, null, null);
                 return;
             }
-            updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh, oldBookie2NewBookie);
+            updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh, mainWorkerPool, oldBookie2NewBookie);
         }
     }
 
@@ -369,7 +372,8 @@ public void processResult(int rc, String path, Object ctx) {
      */
     private static void updateEnsembleInfo(
             AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId,
-            LedgerHandle lh, Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) {
+            LedgerHandle lh, OrderedExecutor mainWorkerPool,
+            Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) {
         /*
          * Update the ledger metadata's ensemble info to point to the new
          * bookie.
@@ -388,7 +392,7 @@ private static void updateEnsembleInfo(
         }
         lh.getLedgerMetadata().updateEnsemble(fragmentStartId, newEnsemble);
         lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
-                fragmentStartId, lh, oldBookie2NewBookie));
+                                                  fragmentStartId, lh, mainWorkerPool, oldBookie2NewBookie));
     }
 
     /**
@@ -399,14 +403,17 @@ private static void updateEnsembleInfo(
     private static class UpdateEnsembleCb implements GenericCallback<LedgerMetadata> {
         final AsyncCallback.VoidCallback ensembleUpdatedCb;
         final LedgerHandle lh;
+        final OrderedExecutor mainWorkerPool;
         final long fragmentStartId;
         final Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie;
 
         public UpdateEnsembleCb(AsyncCallback.VoidCallback ledgerFragmentsMcb,
                 long fragmentStartId, LedgerHandle lh,
+                OrderedExecutor mainWorkerPool,
                 Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) {
             this.ensembleUpdatedCb = ledgerFragmentsMcb;
             this.lh = lh;
+            this.mainWorkerPool = mainWorkerPool;
             this.fragmentStartId = fragmentStartId;
             this.oldBookie2NewBookie = oldBookie2NewBookie;
         }
@@ -419,8 +426,7 @@ public void operationComplete(int rc, LedgerMetadata writtenMetadata) {
                 // try again, the previous success (with which this has
                 // conflicted) will have updated the stat other operations
                 // such as (addEnsemble) would update it too.
-                lh.rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(
-                                lh.bk.mainWorkerPool, lh.getId()) {
+                lh.rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(mainWorkerPool, lh.getId()) {
                             @Override
                             public void safeOperationComplete(int rc,
                                     LedgerMetadata newMeta) {
@@ -440,7 +446,7 @@ public void safeOperationComplete(int rc,
                                         }
                                     }
                                     updateEnsembleInfo(ensembleUpdatedCb,
-                                            fragmentStartId, lh, oldBookie2NewBookie);
+                                                       fragmentStartId, lh, mainWorkerPool, oldBookie2NewBookie);
                                 }
                             }
                             @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 088a7c5aa5..4f65ab3378 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -98,9 +98,10 @@
 public class LedgerHandle implements WriteHandle {
     static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
 
+    final ClientContext clientCtx;
+
     final byte[] ledgerKey;
     private LedgerMetadata metadata;
-    final BookKeeper bk;
     final long ledgerId;
     long lastAddPushed;
 
@@ -121,13 +122,11 @@
     final DistributionSchedule distributionSchedule;
     final RateLimiter throttler;
     final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
-    final boolean enableParallelRecoveryRead;
-    final int recoveryReadBatchSize;
     final BookiesHealthInfo bookiesHealthInfo;
     final EnumSet<WriteFlag> writeFlags;
+
     ScheduledFuture<?> timeoutFuture = null;
 
-    final long waitForWriteSetMs;
     private final Map<Integer, BookieSocketAddress> delayedWriteFailedBookies =
             new HashMap<Integer, BookieSocketAddress>();
 
@@ -145,7 +144,6 @@
 
     final AtomicInteger blockAddCompletions = new AtomicInteger(0);
     final AtomicInteger numEnsembleChanges = new AtomicInteger(0);
-    final int maxAllowedEnsembleChanges;
     Queue<PendingAddOp> pendingAddOps;
     ExplicitLacFlushPolicy explicitLacFlushPolicy;
 
@@ -168,17 +166,16 @@
         return delayedWriteFailedBookies;
     }
 
-    LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
-                 BookKeeper.DigestType digestType, byte[] password, EnumSet<WriteFlag> writeFlags)
+    LedgerHandle(ClientContext clientCtx,
+                 long ledgerId, LedgerMetadata metadata,
+                 BookKeeper.DigestType digestType, byte[] password,
+                 EnumSet<WriteFlag> writeFlags)
             throws GeneralSecurityException, NumberFormatException {
-        this.bk = bk;
+        this.clientCtx = clientCtx;
+
         this.metadata = metadata;
         this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
-        this.enableParallelRecoveryRead = bk.getConf().getEnableParallelRecoveryRead();
-        this.recoveryReadBatchSize = bk.getConf().getRecoveryReadBatchSize();
         this.writeFlags = writeFlags;
-        this.waitForWriteSetMs = bk.getConf().getWaitTimeoutOnBackpressureMillis();
-
         if (metadata.isClosed()) {
             lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
             length = metadata.getLength();
@@ -191,14 +188,14 @@
 
         this.ledgerId = ledgerId;
 
-        if (bk.getConf().getThrottleValue() > 0) {
-            this.throttler = RateLimiter.create(bk.getConf().getThrottleValue());
+        if (clientCtx.getConf().throttleValue > 0) {
+            this.throttler = RateLimiter.create(clientCtx.getConf().throttleValue);
         } else {
             this.throttler = null;
         }
 
         macManager = DigestManager.instantiate(ledgerId, password, BookKeeper.DigestType.toProtoDigestType(digestType),
-                bk.getConf().getUseV2WireProtocol());
+                                               clientCtx.getConf().useV2WireProtocol);
 
         // If the password is empty, pass the same random ledger key which is generated by the hash of the empty
         // password, so that the bookie can avoid processing the keys for each entry
@@ -206,7 +203,7 @@
         distributionSchedule = new RoundRobinDistributionSchedule(
                 metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(), metadata.getEnsembleSize());
         this.bookieFailureHistory = CacheBuilder.newBuilder()
-            .expireAfterWrite(bk.getConf().getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS)
+            .expireAfterWrite(clientCtx.getConf().bookieFailureHistoryExpirationMSec, TimeUnit.MILLISECONDS)
             .build(new CacheLoader<BookieSocketAddress, Long>() {
             @Override
             public Long load(BookieSocketAddress key) {
@@ -222,52 +219,48 @@ public long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress) {
 
             @Override
             public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
-                return bk.bookieClient.getNumPendingRequests(bookieSocketAddress, ledgerId);
+                return clientCtx.getBookieClient().getNumPendingRequests(bookieSocketAddress, ledgerId);
             }
         };
 
-        maxAllowedEnsembleChanges = bk.getConf().getMaxAllowedEnsembleChanges();
-        ensembleChangeCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
-        lacUpdateHitsCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
-        lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES);
-        clientChannelWriteWaitStats = bk.getStatsLogger()
-                .getOpStatsLogger(BookKeeperClientStats.CLIENT_CHANNEL_WRITE_WAIT);
-        bk.getStatsLogger().registerGauge(BookKeeperClientStats.PENDING_ADDS,
-                                          new Gauge<Integer>() {
-                                              @Override
-                                            public Integer getDefaultValue() {
-                                                  return 0;
-                                              }
-                                              @Override
-                                            public Integer getSample() {
-                                                  return pendingAddOps.size();
-                                              }
-                                          });
+        ensembleChangeCounter = clientCtx.getClientStats().getEnsembleChangeCounter();
+        lacUpdateHitsCounter = clientCtx.getClientStats().getLacUpdateHitsCounter();
+        lacUpdateMissesCounter = clientCtx.getClientStats().getLacUpdateMissesCounter();
+        clientChannelWriteWaitStats = clientCtx.getClientStats().getClientChannelWriteWaitLogger();
+
+        clientCtx.getClientStats().registerPendingAddsGauge(new Gauge<Integer>() {
+                @Override
+                public Integer getDefaultValue() {
+                    return 0;
+                }
+                @Override
+                public Integer getSample() {
+                    return pendingAddOps.size();
+                }
+            });
         initializeExplicitLacFlushPolicy();
 
-        if (bk.getConf().getAddEntryQuorumTimeout() > 0) {
+        if (clientCtx.getConf().addEntryQuorumTimeoutNanos > 0) {
             SafeRunnable monitor = new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         monitorPendingAddOps();
                     }
                 };
-            this.timeoutFuture = bk.scheduler.scheduleAtFixedRate(monitor,
-                                                                  bk.getConf().getTimeoutMonitorIntervalSec(),
-                                                                  bk.getConf().getTimeoutMonitorIntervalSec(),
-                                                                  TimeUnit.SECONDS);
+            this.timeoutFuture = clientCtx.getScheduler().scheduleAtFixedRate(
+                    monitor,
+                    clientCtx.getConf().timeoutMonitorIntervalSec,
+                    clientCtx.getConf().timeoutMonitorIntervalSec,
+                    TimeUnit.SECONDS);
         }
     }
 
-    BookKeeper getBk() {
-        return bk;
-    }
-
     protected void initializeExplicitLacFlushPolicy() {
         if (!getLedgerMetadata().isClosed()
             && !(this instanceof ReadOnlyLedgerHandle)
-            && bk.getExplicitLacInterval() > 0) {
-            explicitLacFlushPolicy = new ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(this);
+            && clientCtx.getConf().explicitLacInterval > 0) {
+            explicitLacFlushPolicy = new ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(
+                    this, clientCtx);
         } else {
             explicitLacFlushPolicy = ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
         }
@@ -432,7 +425,7 @@ void writeLedgerConfig(GenericCallback<LedgerMetadata> writeCb) {
             LOG.debug("Writing metadata to ledger manager: {}, {}", this.ledgerId, getLedgerMetadata().getVersion());
         }
 
-        bk.getLedgerManager().writeLedgerMetadata(ledgerId, getLedgerMetadata(), writeCb);
+        clientCtx.getLedgerManager().writeLedgerMetadata(ledgerId, getLedgerMetadata(), writeCb);
     }
 
     /**
@@ -490,8 +483,9 @@ void asyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc)
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Failed to close ledger {} : ", ledgerId, re);
             }
-            errorOutPendingAdds(bk.getReturnRc(rc));
-            cb.closeComplete(bk.getReturnRc(BKException.Code.InterruptedException), this, ctx);
+            errorOutPendingAdds(BookKeeper.getReturnRc(clientCtx.getBookieClient(), rc));
+            cb.closeComplete(BookKeeper.getReturnRc(clientCtx.getBookieClient(), BKException.Code.InterruptedException),
+                             this, ctx);
         }
     }
 
@@ -505,7 +499,7 @@ void asyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc)
      * @param rc
      */
     void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) {
-        bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
+        clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
                 final long prevLastEntryId;
@@ -568,14 +562,14 @@ public void safeRun() {
 
                 final class CloseCb extends OrderedGenericCallback<LedgerMetadata> {
                     CloseCb() {
-                        super(bk.getMainWorkerPool(), ledgerId);
+                        super(clientCtx.getMainWorkerPool(), ledgerId);
                     }
 
                     @Override
                     public void safeOperationComplete(final int rc, LedgerMetadata writtenMetadata) {
                         if (rc == BKException.Code.MetadataVersionException) {
-                            rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
-                                                                                          ledgerId) {
+                            rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(clientCtx.getMainWorkerPool(),
+                                                                                      ledgerId) {
                                 @Override
                                 public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
                                     if (newrc != BKException.Code.OK) {
@@ -816,7 +810,7 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal
 
     void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb,
                                   Object ctx, boolean isRecoveryRead) {
-        if (!bk.isClosed()) {
+        if (!clientCtx.isClientClosed()) {
             readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead)
                 .whenCompleteAsync(new FutureEventListener<LedgerEntries>() {
                     @Override
@@ -842,7 +836,7 @@ public void onFailure(Throwable cause) {
                             cb.readComplete(Code.UnexpectedConditionException, LedgerHandle.this, null, ctx);
                         }
                     }
-                }, bk.getMainWorkerPool().chooseThread(ledgerId));
+                    }, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
         } else {
             cb.readComplete(Code.ClientClosedException, LedgerHandle.this, null, ctx);
         }
@@ -883,8 +877,9 @@ public LedgerEntry readLastEntry()
     CompletableFuture<LedgerEntries> readEntriesInternalAsync(long firstEntry,
                                                               long lastEntry,
                                                               boolean isRecoveryRead) {
-        PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), firstEntry, lastEntry, isRecoveryRead);
-        if (!bk.isClosed()) {
+        PendingReadOp op = new PendingReadOp(this, clientCtx,
+                                             firstEntry, lastEntry, isRecoveryRead);
+        if (!clientCtx.isClientClosed()) {
             // Waiting on the first one.
             // This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive
             // state later after N requests sent.
@@ -899,14 +894,14 @@ public LedgerEntry readLastEntry()
             // unresponsive thus helpful enough.
             DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(firstEntry);
             try {
-                if (!waitForWritable(ws, firstEntry, ws.size() - 1, waitForWriteSetMs)) {
+                if (!waitForWritable(ws, firstEntry, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) {
                     op.allowFailFastOnUnwritableChannel();
                 }
             } finally {
                 ws.recycle();
             }
 
-            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
+            clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
         } else {
             op.future().completeExceptionally(BKException.create(ClientClosedException));
         }
@@ -1057,7 +1052,7 @@ public void asyncAddEntry(final byte[] data, final int offset, final int length,
     }
 
     public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
-        PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx);
+        PendingAddOp op = PendingAddOp.create(this, clientCtx, data, writeFlags, cb, ctx);
         doAsyncAddEntry(op);
     }
 
@@ -1140,7 +1135,7 @@ public void asyncAddEntry(final long entryId, ByteBuf data,
     @Override
     public CompletableFuture<Void> force() {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        ForceLedgerOp op = new ForceLedgerOp(this, result);
+        ForceLedgerOp op = new ForceLedgerOp(this, clientCtx.getBookieClient(), result);
         boolean wasClosed = false;
         synchronized (this) {
             // synchronized on this to ensure that
@@ -1154,7 +1149,7 @@ public void asyncAddEntry(final long entryId, ByteBuf data,
         if (wasClosed) {
             // make sure the callback is triggered in main worker pool
             try {
-                bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
+                clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         LOG.warn("Force() attempted on a closed ledger: {}", ledgerId);
@@ -1174,7 +1169,7 @@ public String toString() {
 
         // early exit: no write has been issued yet
         if (pendingAddsSequenceHead == INVALID_ENTRY_ID) {
-            bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
+            clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         FutureUtils.complete(result, null);
@@ -1189,7 +1184,7 @@ public String toString() {
         }
 
         try {
-            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
+            clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
             result.completeExceptionally(new BKException.BKInterruptedException());
         }
@@ -1207,7 +1202,8 @@ public String toString() {
      */
     void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length,
                                final AddCallback cb, final Object ctx) {
-        PendingAddOp op = PendingAddOp.create(this, Unpooled.wrappedBuffer(data, offset, length),
+        PendingAddOp op = PendingAddOp.create(this, clientCtx,
+                                              Unpooled.wrappedBuffer(data, offset, length),
                                               writeFlags, cb, ctx)
                 .enableRecoveryAdd();
         doAsyncAddEntry(op);
@@ -1224,7 +1220,7 @@ private boolean isWritesetWritable(DistributionSchedule.WriteSet writeSet,
 
         int nonWritableCount = 0;
         for (int i = 0; i < sz; i++) {
-            if (!bk.getBookieClient().isWritable(getLedgerMetadata().currentEnsemble.get(i), key)) {
+            if (!clientCtx.getBookieClient().isWritable(getLedgerMetadata().currentEnsemble.get(i), key)) {
                 nonWritableCount++;
                 if (nonWritableCount >= allowedNonWritableCount) {
                     return false;
@@ -1317,7 +1313,7 @@ protected void doAsyncAddEntry(final PendingAddOp op) {
         if (wasClosed) {
             // make sure the callback is triggered in main worker pool
             try {
-                bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
+                clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
@@ -1331,7 +1327,8 @@ public String toString() {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
+                                                                    BKException.Code.InterruptedException),
                         LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
             }
             return;
@@ -1339,7 +1336,7 @@ public String toString() {
 
         DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
         try {
-            if (!waitForWritable(ws, op.getEntryId(), 0, waitForWriteSetMs)) {
+            if (!waitForWritable(ws, op.getEntryId(), 0, clientCtx.getConf().waitForWriteSetMs)) {
                 op.allowFailFastOnUnwritableChannel();
             }
         } finally {
@@ -1347,9 +1344,10 @@ public String toString() {
         }
 
         try {
-            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
+            clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
-            op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+            op.cb.addCompleteWithLatency(
+                    BookKeeper.getReturnRc(clientCtx.getBookieClient(), BKException.Code.InterruptedException),
                     LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
         }
     }
@@ -1405,7 +1403,7 @@ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData dat
                 }
             };
 
-        new ReadLastConfirmedOp(this, innercb).initiate();
+        new ReadLastConfirmedOp(this, clientCtx.getBookieClient(), innercb).initiate();
     }
 
     /**
@@ -1451,7 +1449,7 @@ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData dat
                 }
             }
         };
-        new TryReadLastConfirmedOp(this, innercb, getLastAddConfirmed()).initiate();
+        new TryReadLastConfirmedOp(this, clientCtx.getBookieClient(), innercb, getLastAddConfirmed()).initiate();
     }
 
     /**
@@ -1560,11 +1558,7 @@ public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, Led
                 }
             }
         };
-        new ReadLastConfirmedAndEntryOp(this,
-            innercb,
-            entryId - 1,
-            timeOutInMillis,
-            bk.getScheduler())
+        new ReadLastConfirmedAndEntryOp(this, clientCtx, innercb, entryId - 1, timeOutInMillis)
             .parallelRead(parallel)
             .initiate();
     }
@@ -1707,7 +1701,7 @@ public void getLacComplete(int rc, long lac) {
                 }
             }
         };
-        new PendingReadLacOp(this, innercb).initiate();
+        new PendingReadLacOp(this, clientCtx.getBookieClient(), innercb).initiate();
     }
 
     /*
@@ -1845,7 +1839,7 @@ EnsembleInfo replaceBookieInMetadata(final Map<Integer, BookieSocketAddress> fai
                     continue;
                 }
                 try {
-                    BookieSocketAddress newBookie = bk.getBookieWatcher().replaceBookie(
+                    BookieSocketAddress newBookie = clientCtx.getBookieWatcher().replaceBookie(
                         metadata.getEnsembleSize(),
                         metadata.getWriteQuorumSize(),
                         metadata.getAckQuorumSize(),
@@ -1889,7 +1883,7 @@ void handleDelayedWriteBookieFailure() {
 
     void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) {
         int curBlockAddCompletions = blockAddCompletions.incrementAndGet();
-        if (bk.getDisableEnsembleChangeFeature().isAvailable()) {
+        if (clientCtx.getConf().disableEnsembleChangeFeature.isAvailable()) {
             blockAddCompletions.decrementAndGet();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Ensemble change is disabled. Retry sending to failed bookies {} for ledger {}.",
@@ -1913,10 +1907,10 @@ void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies)
         int curNumEnsembleChanges = numEnsembleChanges.incrementAndGet();
 
         // when the ensemble changes are too frequent, close handle
-        if (curNumEnsembleChanges > maxAllowedEnsembleChanges) {
+        if (curNumEnsembleChanges > clientCtx.getConf().maxAllowedEnsembleChanges) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Ledger {} reaches max allowed ensemble change number {}",
-                        ledgerId, maxAllowedEnsembleChanges);
+                          ledgerId, clientCtx.getConf().maxAllowedEnsembleChanges);
             }
             handleUnrecoverableErrorDuringAdd(WriteException);
             return;
@@ -1982,7 +1976,7 @@ public String toString() {
         ChangeEnsembleCb(EnsembleInfo ensembleInfo,
                          int curBlockAddCompletions,
                          int ensembleChangeIdx) {
-            super(bk.getMainWorkerPool(), ledgerId);
+            super(clientCtx.getMainWorkerPool(), ledgerId);
             this.ensembleInfo = ensembleInfo;
             this.curBlockAddCompletions = curBlockAddCompletions;
             this.ensembleChangeIdx = ensembleChangeIdx;
@@ -2045,7 +2039,7 @@ public String toString() {
                                EnsembleInfo ensembleInfo,
                                int curBlockAddCompletions,
                                int ensembleChangeIdx) {
-            super(bk.getMainWorkerPool(), ledgerId);
+            super(clientCtx.getMainWorkerPool(), ledgerId);
             this.rc = rc;
             this.ensembleInfo = ensembleInfo;
             this.curBlockAddCompletions = curBlockAddCompletions;
@@ -2216,11 +2210,11 @@ void unsetSuccessAndSendWriteRequest(final Set<Integer> bookies) {
     }
 
     void rereadMetadata(final GenericCallback<LedgerMetadata> cb) {
-        bk.getLedgerManager().readLedgerMetadata(ledgerId, cb);
+        clientCtx.getLedgerManager().readLedgerMetadata(ledgerId, cb);
     }
 
     void registerOperationFailureOnBookie(BookieSocketAddress bookie, long entryId) {
-        if (bk.getConf().getEnableBookieFailureTracking()) {
+        if (clientCtx.getConf().enableBookieFailureTracking) {
             bookieFailureHistory.put(bookie, entryId);
         }
     }
@@ -2246,7 +2240,7 @@ void recover(GenericCallback<Void> finalCb,
         final GenericCallback<Void> cb = new TimedGenericCallback<Void>(
             finalCb,
             BKException.Code.OK,
-            bk.getRecoverOpLogger());
+            clientCtx.getClientStats().getRecoverOpLogger());
         boolean wasClosed = false;
         boolean wasInRecovery = false;
 
@@ -2283,20 +2277,18 @@ void recover(GenericCallback<Void> finalCb,
         if (wasInRecovery) {
             // if metadata is already in recover, dont try to write again,
             // just do the recovery from the starting point
-            new LedgerRecoveryOp(LedgerHandle.this, cb)
-                    .parallelRead(enableParallelRecoveryRead)
-                    .readBatchSize(recoveryReadBatchSize)
+            new LedgerRecoveryOp(LedgerHandle.this, clientCtx, cb)
                     .setEntryListener(listener)
                     .initiate();
             return;
         }
 
-        writeLedgerConfig(new OrderedGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(), ledgerId) {
+        writeLedgerConfig(new OrderedGenericCallback<LedgerMetadata>(clientCtx.getMainWorkerPool(), ledgerId) {
             @Override
             public void safeOperationComplete(final int rc, LedgerMetadata writtenMetadata) {
                 if (rc == BKException.Code.MetadataVersionException) {
-                    rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(bk.getMainWorkerPool(),
-                                                                                  ledgerId) {
+                    rereadMetadata(new OrderedGenericCallback<LedgerMetadata>(clientCtx.getMainWorkerPool(),
+                                                                              ledgerId) {
                         @Override
                         public void safeOperationComplete(int rc, LedgerMetadata newMeta) {
                             if (rc != BKException.Code.OK) {
@@ -2317,9 +2309,7 @@ public String toString() {
                     // in recovery otherwise, it couldn't prevent us advancing last confirmed while the other writer is
                     // closing the ledger, which will cause inconsistent last add confirmed on bookies & zookeeper
                     // metadata.
-                    new LedgerRecoveryOp(LedgerHandle.this, cb)
-                        .parallelRead(enableParallelRecoveryRead)
-                        .readBatchSize(recoveryReadBatchSize)
+                    new LedgerRecoveryOp(LedgerHandle.this, clientCtx, cb)
                         .setEntryListener(listener)
                         .initiate();
                 } else {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 48beaa7790..e5fa0acd02 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -55,10 +55,11 @@ public int compare(PendingAddOp o1, PendingAddOp o2) {
         }
     }
 
-    LedgerHandleAdv(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
+    LedgerHandleAdv(ClientContext clientCtx,
+                    long ledgerId, LedgerMetadata metadata,
                     BookKeeper.DigestType digestType, byte[] password, EnumSet<WriteFlag> writeFlags)
             throws GeneralSecurityException, NumberFormatException {
-        super(bk, ledgerId, metadata, digestType, password, writeFlags);
+        super(clientCtx, ledgerId, metadata, digestType, password, writeFlags);
         pendingAddOps = new PriorityBlockingQueue<PendingAddOp>(10, new PendingOpsComparator());
     }
 
@@ -194,7 +195,7 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse
     @Override
     public void asyncAddEntry(final long entryId, ByteBuf data,
                               final AddCallbackWithLatency cb, final Object ctx) {
-        PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx);
+        PendingAddOp op = PendingAddOp.create(this, clientCtx, data, writeFlags, cb, ctx);
         op.setEntryId(entryId);
 
         if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) {
@@ -233,7 +234,7 @@ protected void doAsyncAddEntry(final PendingAddOp op) {
         if (wasClosed) {
             // make sure the callback is triggered in main worker pool
             try {
-                bk.getMainWorkerPool().submit(new SafeRunnable() {
+                clientCtx.getMainWorkerPool().submit(new SafeRunnable() {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
@@ -246,21 +247,23 @@ public String toString() {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+                op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
+                                                                    BKException.Code.InterruptedException),
                         LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
             }
             return;
         }
 
         if (!waitForWritable(distributionSchedule.getWriteSet(op.getEntryId()),
-                op.getEntryId(), 0, waitForWriteSetMs)) {
+                    op.getEntryId(), 0, clientCtx.getConf().waitForWriteSetMs)) {
             op.allowFailFastOnUnwritableChannel();
         }
 
         try {
-            bk.getMainWorkerPool().executeOrdered(ledgerId, op);
+            clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
-            op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException),
+            op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
+                                                                BKException.Code.InterruptedException),
                               LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index c3a69bd32d..cf5f3a7aeb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -73,7 +73,8 @@
      * @param cb
      * @param ctx
      */
-    public LedgerOpenOp(BookKeeper bk, long ledgerId, DigestType digestType, byte[] passwd,
+    public LedgerOpenOp(BookKeeper bk, BookKeeperClientStats clientStats,
+                        long ledgerId, DigestType digestType, byte[] passwd,
                         OpenCallback cb, Object ctx) {
         this.bk = bk;
         this.ledgerId = ledgerId;
@@ -82,10 +83,11 @@ public LedgerOpenOp(BookKeeper bk, long ledgerId, DigestType digestType, byte[]
         this.ctx = ctx;
         this.enableDigestAutodetection = bk.getConf().getEnableDigestTypeAutodetection();
         this.suggestedDigestType = digestType;
-        this.openOpLogger = bk.getOpenOpLogger();
+        this.openOpLogger = clientStats.getOpenOpLogger();
     }
 
-    public LedgerOpenOp(BookKeeper bk, long ledgerId, OpenCallback cb, Object ctx) {
+    public LedgerOpenOp(BookKeeper bk, BookKeeperClientStats clientStats,
+                        long ledgerId, OpenCallback cb, Object ctx) {
         this.bk = bk;
         this.ledgerId = ledgerId;
         this.cb = cb;
@@ -95,7 +97,7 @@ public LedgerOpenOp(BookKeeper bk, long ledgerId, OpenCallback cb, Object ctx) {
         this.administrativeOpen = true;
         this.enableDigestAutodetection = false;
         this.suggestedDigestType = bk.conf.getBookieRecoveryDigestType();
-        this.openOpLogger = bk.getOpenOpLogger();
+        this.openOpLogger = clientStats.getOpenOpLogger();
     }
 
     /**
@@ -165,7 +167,8 @@ public void operationComplete(int rc, LedgerMetadata metadata) {
 
         // get the ledger metadata back
         try {
-            lh = new ReadOnlyLedgerHandle(bk, ledgerId, metadata, digestType, passwd, !doRecovery);
+            lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, metadata, digestType,
+                                          passwd, !doRecovery);
         } catch (GeneralSecurityException e) {
             LOG.error("Security exception while opening ledger: " + ledgerId, e);
             openComplete(BKException.Code.DigestNotInitializedException, null);
@@ -247,7 +250,8 @@ private void open(OpenCallback cb) {
                 return;
             }
 
-            LedgerOpenOp op = new LedgerOpenOp(bk, ledgerId, fromApiDigestType(digestType),
+            LedgerOpenOp op = new LedgerOpenOp(bk, bk.getClientCtx().getClientStats(),
+                                               ledgerId, fromApiDigestType(digestType),
                                                password, cb, null);
             ReentrantReadWriteLock closeLock = bk.getCloseLock();
             closeLock.readLock().lock();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index d414b7b713..18b47e069f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -18,7 +18,6 @@
 package org.apache.bookkeeper.client;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -42,6 +41,8 @@
     static final Logger LOG = LoggerFactory.getLogger(LedgerRecoveryOp.class);
 
     final LedgerHandle lh;
+    final ClientContext clientCtx;
+
     final AtomicLong readCount, writeCount;
     volatile boolean readDone;
     final AtomicBoolean callbackDone;
@@ -50,8 +51,6 @@
     final GenericCallback<Void> cb;
     // keep a copy of metadata for recovery.
     LedgerMetadata metadataForRecovery;
-    boolean parallelRead = false;
-    int readBatchSize = 1;
 
     // EntryListener Hook
     @VisibleForTesting
@@ -59,10 +58,11 @@
 
     class RecoveryReadOp extends ListenerBasedPendingReadOp {
 
-        RecoveryReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
+        RecoveryReadOp(LedgerHandle lh,
+                       ClientContext clientCtx,
                        long startEntryId, long endEntryId,
                        ReadEntryListener cb, Object ctx) {
-            super(lh, scheduler, startEntryId, endEntryId, cb, ctx, true);
+            super(lh, clientCtx, startEntryId, endEntryId, cb, ctx, true);
         }
 
         @Override
@@ -72,23 +72,15 @@ protected LedgerMetadata getLedgerMetadata() {
 
     }
 
-    public LedgerRecoveryOp(LedgerHandle lh, GenericCallback<Void> cb) {
+    public LedgerRecoveryOp(LedgerHandle lh, ClientContext clientCtx,
+                            GenericCallback<Void> cb) {
         readCount = new AtomicLong(0);
         writeCount = new AtomicLong(0);
         readDone = false;
         callbackDone = new AtomicBoolean(false);
         this.cb = cb;
         this.lh = lh;
-    }
-
-    LedgerRecoveryOp parallelRead(boolean enabled) {
-        this.parallelRead = enabled;
-        return this;
-    }
-
-    LedgerRecoveryOp readBatchSize(int batchSize) {
-        this.readBatchSize = batchSize;
-        return this;
+        this.clientCtx = clientCtx;
     }
 
     /**
@@ -105,7 +97,7 @@ LedgerRecoveryOp setEntryListener(ReadEntryListener entryListener) {
     }
 
     public void initiate() {
-        ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh,
+        ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(),
                 new ReadLastConfirmedOp.LastConfirmedDataCallback() {
                     public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
                         if (rc == BKException.Code.OK) {
@@ -137,11 +129,11 @@ public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
 
     private void submitCallback(int rc) {
         if (BKException.Code.OK == rc) {
-            lh.bk.getRecoverAddCountLogger().registerSuccessfulValue(writeCount.get());
-            lh.bk.getRecoverReadCountLogger().registerSuccessfulValue(readCount.get());
+            clientCtx.getClientStats().getRecoverAddCountLogger().registerSuccessfulValue(writeCount.get());
+            clientCtx.getClientStats().getRecoverReadCountLogger().registerSuccessfulValue(readCount.get());
         } else {
-            lh.bk.getRecoverAddCountLogger().registerFailedValue(writeCount.get());
-            lh.bk.getRecoverReadCountLogger().registerFailedValue(readCount.get());
+            clientCtx.getClientStats().getRecoverAddCountLogger().registerFailedValue(writeCount.get());
+            clientCtx.getClientStats().getRecoverReadCountLogger().registerFailedValue(readCount.get());
         }
         cb.operationComplete(rc, null);
     }
@@ -152,9 +144,9 @@ private void submitCallback(int rc) {
     private void doRecoveryRead() {
         if (!callbackDone.get()) {
             startEntryToRead = endEntryToRead + 1;
-            endEntryToRead = endEntryToRead + readBatchSize;
-            new RecoveryReadOp(lh, lh.bk.getScheduler(), startEntryToRead, endEntryToRead, this, null)
-                    .parallelRead(parallelRead).initiate();
+            endEntryToRead = endEntryToRead + clientCtx.getConf().recoveryReadBatchSize;
+            new RecoveryReadOp(lh, clientCtx, startEntryToRead, endEntryToRead, this, null)
+                .initiate();
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
index 290e69bf66..108a805ca9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
@@ -20,7 +20,6 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
@@ -33,29 +32,13 @@
     final Object ctx;
 
     ListenerBasedPendingReadOp(LedgerHandle lh,
-                               ScheduledExecutorService scheduler,
-                               long startEntryId,
-                               long endEntryId,
-                               ReadEntryListener listener,
-                               Object ctx) {
-        this(
-            lh,
-            scheduler,
-            startEntryId,
-            endEntryId,
-            listener,
-            ctx,
-            false);
-    }
-
-    ListenerBasedPendingReadOp(LedgerHandle lh,
-                               ScheduledExecutorService scheduler,
+                               ClientContext clientCtx,
                                long startEntryId,
                                long endEntryId,
                                ReadEntryListener listener,
                                Object ctx,
                                boolean isRecoveryRead) {
-        super(lh, scheduler, startEntryId, endEntryId, isRecoveryRead);
+        super(lh, clientCtx, startEntryId, endEntryId, isRecoveryRead);
         this.listener = listener;
         this.ctx = ctx;
     }
@@ -71,11 +54,13 @@ protected void submitCallback(int code) {
             long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
             LedgerEntry entry;
             if (BKException.Code.OK == request.getRc()) {
-                readOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
+                clientCtx.getClientStats().getReadOpLogger()
+                    .registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
                 // callback with completed entry
                 entry = new LedgerEntry(request.entryImpl);
             } else {
-                readOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
+                clientCtx.getClientStats().getReadOpLogger()
+                    .registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
                 entry = null;
             }
             request.close();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index afcaed6c00..d10bbe745b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -37,8 +37,6 @@
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -68,24 +66,24 @@
     boolean completed = false;
 
     LedgerHandle lh;
+    ClientContext clientCtx;
     boolean isRecoveryAdd = false;
     long requestTimeNanos;
     long qwcLatency; // Quorum Write Completion Latency after response from quorum bookies.
 
-    long timeoutNanos;
-
-    OpStatsLogger addOpLogger;
-    Counter addOpUrCounter;
     long currentLedgerLength;
     int pendingWriteRequests;
     boolean callbackTriggered;
     boolean hasRun;
     EnumSet<WriteFlag> writeFlags;
     boolean allowFailFast = false;
-    static PendingAddOp create(LedgerHandle lh, ByteBuf payload, EnumSet<WriteFlag> writeFlags,
+
+    static PendingAddOp create(LedgerHandle lh, ClientContext clientCtx,
+                               ByteBuf payload, EnumSet<WriteFlag> writeFlags,
                                AddCallbackWithLatency cb, Object ctx) {
         PendingAddOp op = RECYCLER.get();
         op.lh = lh;
+        op.clientCtx = clientCtx;
         op.isRecoveryAdd = false;
         op.cb = cb;
         op.ctx = ctx;
@@ -96,9 +94,6 @@ static PendingAddOp create(LedgerHandle lh, ByteBuf payload, EnumSet<WriteFlag>
 
         op.completed = false;
         op.ackSet = lh.getDistributionSchedule().getAckSet();
-        op.addOpLogger = lh.getBk().getAddOpLogger();
-        op.addOpUrCounter = lh.getBk().getAddOpUrCounter();
-        op.timeoutNanos = lh.getBk().getAddEntryQuorumTimeoutNanos();
         op.pendingWriteRequests = 0;
         op.callbackTriggered = false;
         op.hasRun = false;
@@ -106,6 +101,7 @@ static PendingAddOp create(LedgerHandle lh, ByteBuf payload, EnumSet<WriteFlag>
         op.allowFailFast = false;
         op.qwcLatency = 0;
         op.writeFlags = writeFlags;
+
         return op;
     }
 
@@ -138,14 +134,14 @@ long getEntryId() {
     void sendWriteRequest(int bookieIndex) {
         int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
 
-        lh.bk.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
-                                         lh.ledgerId, lh.ledgerKey, entryId, toSend, this, bookieIndex,
-                                         flags, allowFailFast, lh.writeFlags);
+        clientCtx.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+                                             lh.ledgerId, lh.ledgerKey, entryId, toSend, this, bookieIndex,
+                                             flags, allowFailFast, lh.writeFlags);
         ++pendingWriteRequests;
     }
 
     boolean maybeTimeout() {
-        if (MathUtils.elapsedNanos(requestTimeNanos) >= timeoutNanos) {
+        if (MathUtils.elapsedNanos(requestTimeNanos) >= clientCtx.getConf().addEntryQuorumTimeoutNanos) {
             timeoutQuorumWait();
             return true;
         }
@@ -154,7 +150,7 @@ boolean maybeTimeout() {
 
     void timeoutQuorumWait() {
         try {
-            lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, new SafeRunnable() {
+            clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
                     if (completed) {
@@ -286,9 +282,9 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre
             if (rc != BKException.Code.OK) {
                 // Got an error after satisfying AQ. This means we are under replicated at the create itself.
                 // Update the stat to reflect it.
-                addOpUrCounter.inc();
-                if (!lh.bk.getDisableEnsembleChangeFeature().isAvailable()
-                        && !lh.bk.delayEnsembleChange) {
+                clientCtx.getClientStats().getAddOpUrCounter().inc();
+                if (!clientCtx.getConf().disableEnsembleChangeFeature.isAvailable()
+                        && !clientCtx.getConf().delayEnsembleChange) {
                     lh.getDelayedWriteFailedBookies().putIfAbsent(bookieIndex, addr);
                 }
             }
@@ -337,7 +333,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre
             lh.handleUnrecoverableErrorDuringAdd(rc);
             return;
         default:
-            if (lh.bk.delayEnsembleChange) {
+            if (clientCtx.getConf().delayEnsembleChange) {
                 if (ackSet.failBookieAndCheck(bookieIndex, addr)
                         || rc == BKException.Code.WriteOnReadOnlyBookieException) {
                     Map<Integer, BookieSocketAddress> failedBookies = ackSet.getFailedBookies();
@@ -379,11 +375,11 @@ void submitCallback(final int rc) {
 
         long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
         if (rc != BKException.Code.OK) {
-            addOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
+            clientCtx.getClientStats().getAddOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
             LOG.error("Write of ledger entry to quorum failed: L{} E{}",
                       lh.getId(), entryId);
         } else {
-            addOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
+            clientCtx.getClientStats().getAddOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
         }
         cb.addCompleteWithLatency(rc, lh, entryId, qwcLatency, ctx);
         callbackTriggered = true;
@@ -467,9 +463,8 @@ private void recyclePendAddOpObject() {
         ackSet.recycle();
         ackSet = null;
         lh = null;
+        clientCtx = null;
         isRecoveryAdd = false;
-        addOpLogger = null;
-        addOpUrCounter = null;
         completed = false;
         pendingWriteRequests = 0;
         callbackTriggered = false;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index 7b3aa9fcd4..493643c934 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -20,6 +20,7 @@
 import io.netty.buffer.ByteBuf;
 
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
 import org.slf4j.Logger;
@@ -43,6 +44,7 @@
 class PendingReadLacOp implements ReadLacCallback {
     static final Logger LOG = LoggerFactory.getLogger(PendingReadLacOp.class);
     LedgerHandle lh;
+    BookieClient bookieClient;
     LacCallback cb;
     int numResponsesPending;
     volatile boolean completed = false;
@@ -57,8 +59,9 @@
         void getLacComplete(int rc, long lac);
     }
 
-    PendingReadLacOp(LedgerHandle lh, LacCallback cb) {
+    PendingReadLacOp(LedgerHandle lh, BookieClient bookieClient, LacCallback cb) {
         this.lh = lh;
+        this.bookieClient = bookieClient;
         this.cb = cb;
         this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
         this.coverageSet = lh.distributionSchedule.getCoverageSet();
@@ -67,8 +70,7 @@
     public void initiate() {
         LedgerMetadata metadata = lh.getLedgerMetadata();
         for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
-            lh.bk.getBookieClient().readLac(metadata.currentEnsemble.get(i),
-                    lh.ledgerId, this, i);
+            bookieClient.readLac(metadata.currentEnsemble.get(i), lh.ledgerId, this, i);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 0aac38f7ab..d31a744f67 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -30,7 +30,6 @@
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,8 +43,6 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +57,6 @@
 class PendingReadOp implements ReadEntryCallback, SafeRunnable {
     private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
 
-    private final ScheduledExecutorService scheduler;
     private ScheduledFuture<?> speculativeTask = null;
     protected final List<LedgerEntryRequest> seq;
     private final CompletableFuture<LedgerEntries> future;
@@ -68,13 +64,12 @@
     private final BitSet heardFromHostsBitSet;
     private final Set<BookieSocketAddress> sentToHosts = new HashSet<BookieSocketAddress>();
     LedgerHandle lh;
+    final ClientContext clientCtx;
+
     long numPendingEntries;
-    long startEntryId;
-    long endEntryId;
+    final long startEntryId;
+    final long endEntryId;
     long requestTimeNanos;
-    OpStatsLogger readOpLogger;
-    Counter readOpDmCounter;
-    private final Counter speculativeReadCounter;
 
     final int requiredBookiesMissingEntryForRecovery;
     final boolean isRecoveryRead;
@@ -101,9 +96,8 @@
             this.ensemble = ensemble;
             this.eId = eId;
 
-            if (lh.bk.isReorderReadSequence()) {
-                writeSet = lh.bk.getPlacementPolicy()
-                    .reorderReadSequence(
+            if (clientCtx.getConf().enableReorderReadSequence) {
+                writeSet = clientCtx.getPlacementPolicy().reorderReadSequence(
                             ensemble,
                             lh.getBookiesHealthInfo(),
                             lh.distributionSchedule.getWriteSet(eId));
@@ -138,7 +132,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer
             try {
                 content = lh.macManager.verifyDigestAndReturnData(eId, buffer);
             } catch (BKDigestMatchException e) {
-                readOpDmCounter.inc();
+                clientCtx.getClientStats().getReadOpDmCounter().inc();
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
                 return false;
             }
@@ -257,7 +251,7 @@ public String toString() {
          */
         @Override
         public ListenableFuture<Boolean> issueSpeculativeRequest() {
-            return lh.bk.getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
+            return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws Exception {
                     if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) {
@@ -372,7 +366,7 @@ synchronized BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFrom) {
             // (even for other entries) from any of the other bookies we have sent the
             // request to
             if (sentTo.cardinality() == 0) {
-                speculativeReadCounter.inc();
+                clientCtx.getClientStats().getSpeculativeReadCounter().inc();
                 return sendNextRead();
             } else {
                 return null;
@@ -445,7 +439,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer) {
                 for (int i = 0; i < numReplicasTried - 1; i++) {
                     int slowBookieIndex = writeSet.get(i);
                     BookieSocketAddress slowBookieSocketAddress = ensemble.get(slowBookieIndex);
-                    lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, eId);
+                    clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId);
                 }
             }
             return completed;
@@ -453,39 +447,24 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer) {
     }
 
     PendingReadOp(LedgerHandle lh,
-                  ScheduledExecutorService scheduler,
-                  long startEntryId,
-                  long endEntryId) {
-        this(
-            lh,
-            scheduler,
-            startEntryId,
-            endEntryId,
-            false);
-    }
-
-    PendingReadOp(LedgerHandle lh,
-                  ScheduledExecutorService scheduler,
+                  ClientContext clientCtx,
                   long startEntryId,
                   long endEntryId,
                   boolean isRecoveryRead) {
         this.seq = new ArrayList<>((int) ((endEntryId + 1) - startEntryId));
         this.future = new CompletableFuture<>();
         this.lh = lh;
+        this.clientCtx = clientCtx;
         this.startEntryId = startEntryId;
         this.endEntryId = endEntryId;
-        this.scheduler = scheduler;
         this.isRecoveryRead = isRecoveryRead;
+
         this.allowFailFast = false;
         numPendingEntries = endEntryId - startEntryId + 1;
         requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize()
                 - getLedgerMetadata().getAckQuorumSize() + 1;
         heardFromHosts = new HashSet<>();
         heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize());
-
-        readOpLogger = lh.bk.getReadOpLogger();
-        readOpDmCounter = lh.bk.getReadOpDmCounter();
-        speculativeReadCounter = lh.bk.getSpeculativeReadCounter();
     }
 
     CompletableFuture<LedgerEntries> future() {
@@ -503,6 +482,7 @@ protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
         }
     }
 
+    // I don't think this is ever used in production code -Ivan
     PendingReadOp parallelRead(boolean enabled) {
         this.parallelRead = enabled;
         return this;
@@ -513,7 +493,7 @@ void allowFailFastOnUnwritableChannel() {
     }
 
     public void submit() {
-        lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
+        clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
     }
 
     void initiate() {
@@ -537,8 +517,9 @@ void initiate() {
         // read the entries.
         for (LedgerEntryRequest entry : seq) {
             entry.read();
-            if (!parallelRead && lh.bk.getReadSpeculativeRequestPolicy().isPresent()) {
-                lh.bk.getReadSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler, entry);
+            if (!parallelRead && clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) {
+                clientCtx.getConf().readSpeculativeRequestPolicy.get()
+                    .initiateSpeculativeRequest(clientCtx.getScheduler(), entry);
             }
         }
     }
@@ -577,9 +558,8 @@ void sendReadTo(int bookieIndex, BookieSocketAddress to, LedgerEntryRequest entr
         }
 
         int flags = isRecoveryRead ? BookieProtocol.FLAG_HIGH_PRIORITY : BookieProtocol.FLAG_NONE;
-        lh.bk.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
-                                          this, new ReadContext(bookieIndex, to, entry),
-                                          flags);
+        clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+                                              this, new ReadContext(bookieIndex, to, entry), flags);
     }
 
     @Override
@@ -641,12 +621,12 @@ protected void submitCallback(int code) {
                             + "Heard from {} : bitset = {}. First unread entry is {}",
                     lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet,
                     firstUnread);
-            readOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
+            clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
             // release the entries
             seq.forEach(LedgerEntryRequest::close);
             future.completeExceptionally(BKException.create(code));
         } else {
-            readOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
+            clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
             future.complete(LedgerEntriesImpl.create(Lists.transform(seq, input -> input.entryImpl)));
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index d1631306a1..730ad07d77 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -22,7 +22,6 @@
 import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,15 +48,16 @@
     int lastSeenError = BKException.Code.WriteException;
 
     LedgerHandle lh;
-    OpStatsLogger putLacOpLogger;
+    ClientContext clientCtx;
 
-    PendingWriteLacOp(LedgerHandle lh, AddLacCallback cb, Object ctx) {
+    PendingWriteLacOp(LedgerHandle lh, ClientContext clientCtx,
+                      AddLacCallback cb, Object ctx) {
         this.lh = lh;
+        this.clientCtx = clientCtx;
         this.cb = cb;
         this.ctx = ctx;
         this.lac = LedgerHandle.INVALID_ENTRY_ID;
         ackSet = lh.distributionSchedule.getAckSet();
-        putLacOpLogger = lh.bk.getWriteLacOpLogger();
     }
 
     void setLac(long lac) {
@@ -70,8 +70,8 @@ void setLac(long lac) {
     }
 
     void sendWriteLacRequest(int bookieIndex) {
-        lh.bk.getBookieClient().writeLac(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
-                                         lh.ledgerId, lh.ledgerKey, lac, toSend, this, bookieIndex);
+        clientCtx.getBookieClient().writeLac(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+                                             lh.ledgerId, lh.ledgerKey, lac, toSend, this, bookieIndex);
     }
 
     void initiate(ByteBufList toSend) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index b342b6f297..d71cc0ab90 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -25,7 +25,6 @@
 import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
@@ -46,7 +45,6 @@
 
     static final Logger LOG = LoggerFactory.getLogger(ReadLastConfirmedAndEntryOp.class);
 
-    private final ScheduledExecutorService scheduler;
     ReadLACAndEntryRequest request;
     final BitSet heardFromHostsBitSet;
     final BitSet emptyResponsesFromHostsBitSet;
@@ -56,6 +54,7 @@
 
     final long requestTimeNano;
     private final LedgerHandle lh;
+    private final ClientContext clientCtx;
     private final LastConfirmedAndEntryCallback cb;
 
     private int numResponsesPending;
@@ -82,8 +81,8 @@
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
             this.writeSet = lh.getDistributionSchedule().getEnsembleSet(eId);
-            if (lh.getBk().shouldReorderReadSequence()) {
-                this.orderedEnsemble = lh.getBk().getPlacementPolicy().reorderReadLACSequence(ensemble,
+            if (clientCtx.getConf().enableReorderReadSequence) {
+                this.orderedEnsemble = clientCtx.getPlacementPolicy().reorderReadLACSequence(ensemble,
                         lh.getBookiesHealthInfo(), writeSet.copy());
             } else {
                 this.orderedEnsemble = writeSet.copy();
@@ -417,7 +416,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer, long
                 for (int i = 0; i < numReplicasTried; i++) {
                     int slowBookieIndex = orderedEnsemble.get(i);
                     BookieSocketAddress slowBookieSocketAddress = ensemble.get(slowBookieIndex);
-                    lh.getBk().getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId);
+                    clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId);
                 }
             }
             return completed;
@@ -425,11 +424,12 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer, long
     }
 
     ReadLastConfirmedAndEntryOp(LedgerHandle lh,
+                                ClientContext clientCtx,
                                 LastConfirmedAndEntryCallback cb,
                                 long prevEntryId,
-                                long timeOutInMillis,
-                                ScheduledExecutorService scheduler) {
+                                long timeOutInMillis) {
         this.lh = lh;
+        this.clientCtx = clientCtx;
         this.cb = cb;
         this.prevEntryId = prevEntryId;
         this.lastAddConfirmed = lh.getLastAddConfirmed();
@@ -441,7 +441,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer, long
         this.numEmptyResponsesAllowed = getLedgerMetadata().getEnsembleSize()
                 - getLedgerMetadata().getAckQuorumSize() + 1;
         this.requestTimeNano = MathUtils.nowInNano();
-        this.scheduler = scheduler;
+
         maxMissedReadsAllowed = getLedgerMetadata().getEnsembleSize()
             - getLedgerMetadata().getAckQuorumSize();
         heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize());
@@ -462,7 +462,7 @@ ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
      */
     @Override
     public ListenableFuture<Boolean> issueSpeculativeRequest() {
-        return lh.getBk().getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
+        return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
             @Override
             public Boolean call() throws Exception {
                 if (!requestComplete.get() && !request.isComplete()
@@ -486,8 +486,9 @@ public void initiate() {
         }
         request.read();
 
-        if (!parallelRead && lh.getBk().getReadLACSpeculativeRequestPolicy().isPresent()) {
-            lh.getBk().getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler, this);
+        if (!parallelRead && clientCtx.getConf().readLACSpeculativeRequestPolicy.isPresent()) {
+            clientCtx.getConf().readLACSpeculativeRequestPolicy.get()
+                .initiateSpeculativeRequest(clientCtx.getScheduler(), this);
         }
     }
 
@@ -496,7 +497,7 @@ void sendReadTo(int bookieIndex, BookieSocketAddress to, ReadLACAndEntryRequest
             LOG.debug("Calling Read LAC and Entry with {} and long polling interval {} on Bookie {} - Parallel {}",
                     prevEntryId, timeOutInMillis, to, parallelRead);
         }
-        lh.getBk().getBookieClient().readEntryWaitForLACUpdate(to,
+        clientCtx.getBookieClient().readEntryWaitForLACUpdate(to,
             lh.getId(),
             BookieProtocol.LAST_ADD_CONFIRMED,
             prevEntryId,
@@ -517,12 +518,12 @@ private void submitCallback(int rc) {
         long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
         LedgerEntry entry;
         if (BKException.Code.OK != rc) {
-            lh.getBk().getReadLacAndEntryOpLogger()
+            clientCtx.getClientStats().getReadLacAndEntryOpLogger()
                 .registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
             entry = null;
         } else {
             // could received advanced lac, with no entry
-            lh.getBk().getReadLacAndEntryOpLogger()
+            clientCtx.getClientStats().getReadLacAndEntryOpLogger()
                 .registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
             if (request.entryImpl.getEntryBuffer() != null) {
                 entry = new LedgerEntry(request.entryImpl);
@@ -564,7 +565,7 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffe
                         long elapsedMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()
                                 - rCtx.getLacUpdateTimestamp().get());
                         elapsedMicros = Math.max(elapsedMicros, 0);
-                        lh.getBk().getReadLacAndEntryRespLogger()
+                        clientCtx.getClientStats().getReadLacAndEntryRespLogger()
                                 .registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
                     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index 112a1452e4..b72639de45 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -20,6 +20,7 @@
 import io.netty.buffer.ByteBuf;
 
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
@@ -33,6 +34,7 @@
 class ReadLastConfirmedOp implements ReadEntryCallback {
     static final Logger LOG = LoggerFactory.getLogger(ReadLastConfirmedOp.class);
     LedgerHandle lh;
+    BookieClient bookieClient;
     int numResponsesPending;
     RecoveryData maxRecoveredData;
     volatile boolean completed = false;
@@ -48,8 +50,10 @@
         void readLastConfirmedDataComplete(int rc, RecoveryData data);
     }
 
-    public ReadLastConfirmedOp(LedgerHandle lh, LastConfirmedDataCallback cb) {
+    public ReadLastConfirmedOp(LedgerHandle lh, BookieClient bookieClient,
+                               LastConfirmedDataCallback cb) {
         this.cb = cb;
+        this.bookieClient = bookieClient;
         this.maxRecoveredData = new RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
         this.lh = lh;
         this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
@@ -59,21 +63,21 @@ public ReadLastConfirmedOp(LedgerHandle lh, LastConfirmedDataCallback cb) {
     public void initiate() {
         LedgerMetadata metadata = lh.getLedgerMetadata();
         for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
-            lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
-                                         lh.ledgerId,
-                                         BookieProtocol.LAST_ADD_CONFIRMED,
-                                         this, i, BookieProtocol.FLAG_NONE);
+            bookieClient.readEntry(metadata.currentEnsemble.get(i),
+                                   lh.ledgerId,
+                                   BookieProtocol.LAST_ADD_CONFIRMED,
+                                   this, i, BookieProtocol.FLAG_NONE);
         }
     }
 
     public void initiateWithFencing() {
         LedgerMetadata metadata = lh.getLedgerMetadata();
         for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
-            lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
-                                              lh.ledgerId,
-                                              BookieProtocol.LAST_ADD_CONFIRMED,
-                                              this, i, BookieProtocol.FLAG_DO_FENCING,
-                                              lh.ledgerKey);
+            bookieClient.readEntry(metadata.currentEnsemble.get(i),
+                                   lh.ledgerId,
+                                   BookieProtocol.LAST_ADD_CONFIRMED,
+                                   this, i, BookieProtocol.FLAG_DO_FENCING,
+                                   lh.ledgerKey);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 3cc59a042b..fba7ebd4f3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -28,7 +28,6 @@
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
@@ -79,24 +78,26 @@ public String toString() {
         }
     }
 
-    ReadOnlyLedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
-                         DigestType digestType, byte[] password, boolean watch)
+    ReadOnlyLedgerHandle(ClientContext clientCtx,
+                         long ledgerId, LedgerMetadata metadata,
+                         BookKeeper.DigestType digestType, byte[] password,
+                         boolean watch)
             throws GeneralSecurityException, NumberFormatException {
-        super(bk, ledgerId, metadata, digestType, password, WriteFlag.NONE);
+        super(clientCtx, ledgerId, metadata, digestType, password, WriteFlag.NONE);
         if (watch) {
-            bk.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
+            clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
         }
     }
 
     @Override
     public void close()
             throws InterruptedException, BKException {
-        bk.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
+        clientCtx.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
     }
 
     @Override
     public void asyncClose(CloseCallback cb, Object ctx) {
-        bk.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
+        clientCtx.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
         cb.closeComplete(BKException.Code.OK, this, ctx);
     }
 
@@ -168,7 +169,7 @@ public void onChanged(long lid, LedgerMetadata newMetadata) {
         }
         if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
             try {
-                bk.getMainWorkerPool().executeOrdered(ledgerId, new MetadataUpdater(newMetadata));
+                clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new MetadataUpdater(newMetadata));
             } catch (RejectedExecutionException ree) {
                 LOG.error("Failed on submitting updater to update ledger metadata on ledger {} : {}",
                         ledgerId, newMetadata);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
index b8a5f0d074..181b343aa2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
@@ -20,6 +20,7 @@
 import io.netty.buffer.ByteBuf;
 
 import org.apache.bookkeeper.client.ReadLastConfirmedOp.LastConfirmedDataCallback;
+import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
@@ -35,6 +36,7 @@
     static final Logger LOG = LoggerFactory.getLogger(TryReadLastConfirmedOp.class);
 
     final LedgerHandle lh;
+    final BookieClient bookieClient;
     final LastConfirmedDataCallback cb;
 
     int numResponsesPending;
@@ -42,8 +44,10 @@
     volatile boolean completed = false;
     RecoveryData maxRecoveredData;
 
-    TryReadLastConfirmedOp(LedgerHandle lh, LastConfirmedDataCallback cb, long lac) {
+    TryReadLastConfirmedOp(LedgerHandle lh, BookieClient bookieClient,
+                           LastConfirmedDataCallback cb, long lac) {
         this.lh = lh;
+        this.bookieClient = bookieClient;
         this.cb = cb;
         this.maxRecoveredData = new RecoveryData(lac, 0);
         this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
@@ -52,10 +56,10 @@
     public void initiate() {
         LedgerMetadata metadata = lh.getLedgerMetadata();
         for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
-            lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
-                                         lh.ledgerId,
-                                         BookieProtocol.LAST_ADD_CONFIRMED,
-                                         this, i, BookieProtocol.FLAG_NONE);
+            bookieClient.readEntry(metadata.currentEnsemble.get(i),
+                                   lh.ledgerId,
+                                   BookieProtocol.LAST_ADD_CONFIRMED,
+                                   this, i, BookieProtocol.FLAG_NONE);
         }
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index 2f72df6159..25da4baac0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -441,6 +441,7 @@ private void batchRecovery(int batchSize) throws Exception {
         ClientConfiguration newConf = new ClientConfiguration()
             .setReadEntryTimeout(60000)
             .setAddEntryTimeout(60000)
+            .setEnableParallelRecoveryRead(false)
             .setRecoveryReadBatchSize(batchSize);
 
         newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
@@ -480,14 +481,19 @@ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
 
         final CountDownLatch recoverLatch = new CountDownLatch(1);
         final AtomicBoolean success = new AtomicBoolean(false);
-        LedgerRecoveryOp recoveryOp = new LedgerRecoveryOp(recoverLh,
+
+        MockClientContext parallelReadCtx = MockClientContext.copyOf(bkc.getClientCtx())
+            .setConf(ClientInternalConf.fromConfig(newConf.setEnableParallelRecoveryRead(true)));
+
+        LedgerRecoveryOp recoveryOp = new LedgerRecoveryOp(
+                recoverLh, parallelReadCtx,
                 new BookkeeperInternalCallbacks.GenericCallback<Void>() {
-            @Override
-            public void operationComplete(int rc, Void result) {
-                success.set(BKException.Code.OK == rc);
-                recoverLatch.countDown();
-            }
-        }).parallelRead(true).readBatchSize(newConf.getRecoveryReadBatchSize());
+                    @Override
+                    public void operationComplete(int rc, Void result) {
+                        success.set(BKException.Code.OK == rc);
+                        recoverLatch.countDown();
+                    }
+                });
         recoveryOp.initiate();
         recoverLatch.await(10, TimeUnit.SECONDS);
         assertTrue(success.get());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 20a8bb87d5..783e435d6c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -30,8 +30,6 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.base.Optional;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
@@ -58,7 +56,6 @@
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -150,22 +147,66 @@ public void setup() throws Exception {
 
         bk = mock(BookKeeper.class);
 
-        NullStatsLogger nullStatsLogger = setupLoggers();
-
         failedBookies = new ArrayList<>();
         availableBookies = new HashSet<>();
 
         when(bk.getCloseLock()).thenReturn(new ReentrantReadWriteLock());
         when(bk.isClosed()).thenReturn(false);
         when(bk.getBookieWatcher()).thenReturn(bookieWatcher);
-        when(bk.getDisableEnsembleChangeFeature()).thenReturn(mock(Feature.class));
-        when(bk.getExplicitLacInterval()).thenReturn(0);
         when(bk.getMainWorkerPool()).thenReturn(executor);
         when(bk.getBookieClient()).thenReturn(bookieClient);
         when(bk.getScheduler()).thenReturn(scheduler);
-        when(bk.getReadSpeculativeRequestPolicy()).thenReturn(Optional.absent());
-        mockBookKeeperGetConf(new ClientConfiguration());
-        when(bk.getStatsLogger()).thenReturn(nullStatsLogger);
+
+        setBookKeeperConfig(new ClientConfiguration());
+        when(bk.getStatsLogger()).thenReturn(NullStatsLogger.INSTANCE);
+        BookKeeperClientStats clientStats = BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE);
+        ClientContext clientCtx = new ClientContext() {
+                @Override
+                public ClientInternalConf getConf() {
+                    return ClientInternalConf.fromConfig(bk.getConf());
+                }
+
+                @Override
+                public LedgerManager getLedgerManager() {
+                    return ledgerManager;
+                }
+
+                @Override
+                public BookieWatcher getBookieWatcher() {
+                    return bookieWatcher;
+                }
+
+                @Override
+                public EnsemblePlacementPolicy getPlacementPolicy() {
+                    return null;
+                }
+
+                @Override
+                public BookieClient getBookieClient() {
+                    return bookieClient;
+                }
+
+                @Override
+                public OrderedExecutor getMainWorkerPool() {
+                    return scheduler;
+                }
+
+                @Override
+                public OrderedScheduler getScheduler() {
+                    return scheduler;
+                }
+
+                @Override
+                public BookKeeperClientStats getClientStats() {
+                    return clientStats;
+                }
+
+                @Override
+                public boolean isClientClosed() {
+                    return bk.isClosed();
+                }
+            };
+        when(bk.getClientCtx()).thenReturn(clientCtx);
         when(bk.getLedgerManager()).thenReturn(ledgerManager);
         when(bk.getLedgerIdGenerator()).thenReturn(ledgerIdGenerator);
         when(bk.getReturnRc(anyInt())).thenAnswer(invocationOnMock -> invocationOnMock.getArgument(0));
@@ -184,24 +225,10 @@ public void setup() throws Exception {
         setupBookieClientForceLedger();
     }
 
-    protected void mockBookKeeperGetConf(ClientConfiguration conf) {
+    protected void setBookKeeperConfig(ClientConfiguration conf) {
         when(bk.getConf()).thenReturn(conf);
     }
 
-    protected NullStatsLogger setupLoggers() {
-        NullStatsLogger nullStatsLogger = NullStatsLogger.INSTANCE;
-        when(bk.getOpenOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
-        when(bk.getRecoverOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
-        when(bk.getAddOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
-        when(bk.getReadOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
-        when(bk.getDeleteOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
-        when(bk.getCreateOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
-        when(bk.getRecoverAddCountLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
-        when(bk.getRecoverReadCountLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
-        when(bk.getAddOpUrCounter()).thenReturn(nullStatsLogger.getCounter("mock"));
-        return nullStatsLogger;
-    }
-
     private DigestManager getDigestType(long ledgerId) throws GeneralSecurityException {
         LedgerMetadata metadata = mockLedgerMetadataRegistry.get(ledgerId);
         return DigestManager.instantiate(
@@ -218,10 +245,6 @@ public void tearDown() {
         executor.shutdown();
     }
 
-    protected void setBookkeeperConfig(ClientConfiguration config) {
-        when(bk.getConf()).thenReturn(config);
-    }
-
     protected CreateBuilder newCreateLedgerOp() {
         return new LedgerCreateOp.CreateBuilderImpl(bk);
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
new file mode 100644
index 0000000000..040402dc10
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.function.BooleanSupplier;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookieClient;
+
+class MockClientContext implements ClientContext {
+    private ClientInternalConf conf;
+    private LedgerManager ledgerManager;
+    private BookieWatcher bookieWatcher;
+    private EnsemblePlacementPolicy placementPolicy;
+    private BookieClient bookieClient;
+    private OrderedExecutor mainWorkerPool;
+    private OrderedScheduler scheduler;
+    private BookKeeperClientStats clientStats;
+    private BooleanSupplier isClientClosed;
+
+    static MockClientContext copyOf(ClientContext other) {
+        return new MockClientContext()
+            .setConf(other.getConf())
+            .setLedgerManager(other.getLedgerManager())
+            .setBookieWatcher(other.getBookieWatcher())
+            .setPlacementPolicy(other.getPlacementPolicy())
+            .setBookieClient(other.getBookieClient())
+            .setMainWorkerPool(other.getMainWorkerPool())
+            .setScheduler(other.getScheduler())
+            .setClientStats(other.getClientStats())
+            .setIsClientClosed(other::isClientClosed);
+    }
+
+    MockClientContext setConf(ClientInternalConf conf) {
+        this.conf = conf;
+        return this;
+    }
+
+    MockClientContext setLedgerManager(LedgerManager ledgerManager) {
+        this.ledgerManager = ledgerManager;
+        return this;
+    }
+
+    MockClientContext setBookieWatcher(BookieWatcher bookieWatcher) {
+        this.bookieWatcher = bookieWatcher;
+        return this;
+    }
+
+    MockClientContext setPlacementPolicy(EnsemblePlacementPolicy placementPolicy) {
+        this.placementPolicy = placementPolicy;
+        return this;
+    }
+
+    MockClientContext setBookieClient(BookieClient bookieClient) {
+        this.bookieClient = bookieClient;
+        return this;
+    }
+
+    MockClientContext setMainWorkerPool(OrderedExecutor mainWorkerPool) {
+        this.mainWorkerPool = mainWorkerPool;
+        return this;
+    }
+
+    MockClientContext setScheduler(OrderedScheduler scheduler) {
+        this.scheduler = scheduler;
+        return this;
+    }
+
+    MockClientContext setClientStats(BookKeeperClientStats clientStats) {
+        this.clientStats = clientStats;
+        return this;
+    }
+
+    MockClientContext setIsClientClosed(BooleanSupplier isClientClosed) {
+        this.isClientClosed = isClientClosed;
+        return this;
+    }
+
+    @Override
+    public ClientInternalConf getConf() {
+        return this.conf;
+    }
+
+    @Override
+    public LedgerManager getLedgerManager() {
+        return this.ledgerManager;
+    }
+
+    @Override
+    public BookieWatcher getBookieWatcher() {
+        return this.bookieWatcher;
+    }
+
+    @Override
+    public EnsemblePlacementPolicy getPlacementPolicy() {
+        return this.placementPolicy;
+    }
+
+    @Override
+    public BookieClient getBookieClient() {
+        return this.bookieClient;
+    }
+
+    @Override
+    public OrderedExecutor getMainWorkerPool() {
+        return this.mainWorkerPool;
+    }
+
+    @Override
+    public OrderedScheduler getScheduler() {
+        return this.scheduler;
+    }
+
+    @Override
+    public BookKeeperClientStats getClientStats() {
+        return clientStats;
+    }
+
+    @Override
+    public boolean isClientClosed() {
+        return isClientClosed.getAsBoolean();
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
index a0d339c299..47350208dc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -59,8 +59,9 @@
     boolean fenced = false;
 
     MockLedgerHandle(MockBookKeeper bk, long id, DigestType digest, byte[] passwd) throws GeneralSecurityException {
-        super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC, "".getBytes()), DigestType.MAC, "".getBytes(),
-                WriteFlag.NONE);
+        super(bk.getClientCtx(), id,
+              new LedgerMetadata(3, 3, 2, DigestType.MAC, "".getBytes()), DigestType.MAC, "".getBytes(),
+              WriteFlag.NONE);
         this.bk = bk;
         this.id = id;
         this.digest = digest;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index b87c6661da..3213c46c69 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -428,7 +428,7 @@ public void testRecoveryOnEntryGap() throws Exception {
         final CountDownLatch addLatch = new CountDownLatch(1);
         final AtomicBoolean addSuccess = new AtomicBoolean(false);
         LOG.info("Add entry {} with lac = {}", entryId, lac);
-        lh.bk.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(0),
+        bkc.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(0),
                                          lh.getId(), lh.ledgerKey, entryId, toSend,
                                          new WriteCallback() {
                                              @Override
@@ -667,7 +667,7 @@ public void operationComplete(int rc, Void result) {
         final AtomicLong lacHolder = new AtomicLong(-1234L);
         final AtomicInteger rcHolder = new AtomicInteger(-1234);
         final CountDownLatch doneLatch = new CountDownLatch(1);
-        new ReadLastConfirmedOp(readLh, new ReadLastConfirmedOp.LastConfirmedDataCallback() {
+        new ReadLastConfirmedOp(readLh, bkc.getBookieClient(), new ReadLastConfirmedOp.LastConfirmedDataCallback() {
             @Override
             public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
                 rcHolder.set(rc);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
index f428b1f499..a6bc088a66 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
@@ -30,6 +30,8 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,18 +41,23 @@
  */
 public class PendingAddOpTest {
 
-    private BookKeeper bk;
     private LedgerHandle lh;
+    private ClientContext mockClientContext;
+
     private ByteBuf payload;
 
     @Before
     public void setup() {
-        bk = mock(BookKeeper.class);
-        when(bk.getAddEntryQuorumTimeoutNanos()).thenReturn(1000L);
-        when(bk.getAddOpLogger()).thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("test"));
-        when(bk.getAddOpUrCounter()).thenReturn(NullStatsLogger.INSTANCE.getCounter("test"));
+        BookKeeperClientStats clientStats = BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE);
+        BookieClient bookieClient = mock(BookieClient.class);
+        OrderedExecutor mainWorkerPool = mock(OrderedExecutor.class);
+        mockClientContext = mock(ClientContext.class);
+        when(mockClientContext.getBookieClient()).thenReturn(bookieClient);
+        when(mockClientContext.getConf()).thenReturn(ClientInternalConf.defaultValues());
+        when(mockClientContext.getMainWorkerPool()).thenReturn(mainWorkerPool);
+        when(mockClientContext.getClientStats()).thenReturn(clientStats);
+
         lh = mock(LedgerHandle.class);
-        when(lh.getBk()).thenReturn(bk);
         when(lh.getDistributionSchedule())
             .thenReturn(new RoundRobinDistributionSchedule(3, 3, 2));
         byte[] data = "test-pending-add-op".getBytes(UTF_8);
@@ -62,9 +69,11 @@ public void setup() {
     public void testExecuteAfterCancelled() {
         AtomicInteger rcHolder = new AtomicInteger(-0xdead);
         PendingAddOp op = PendingAddOp.create(
-            lh, payload, WriteFlag.NONE, (rc, handle, entryId, qwcLatency, ctx) -> {
-                rcHolder.set(rc);
-            }, null);
+                lh, mockClientContext,
+                payload, WriteFlag.NONE,
+                (rc, handle, entryId, qwcLatency, ctx) -> {
+                    rcHolder.set(rc);
+                }, null);
         assertSame(lh, op.lh);
 
         // cancel the op.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
index f0a03b030d..7d524a5778 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -30,7 +30,6 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.base.Optional;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
@@ -52,6 +51,7 @@
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -59,7 +59,6 @@
 import org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.proto.checksum.DummyDigestManager;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.junit.After;
@@ -75,13 +74,14 @@
     private static final long LEDGERID = System.currentTimeMillis();
 
     private final TestStatsProvider testStatsProvider = new TestStatsProvider();
-    private OpStatsLogger readLacAndEntryOpLogger;
+    private BookKeeperClientStats clientStats;
+    private ClientContext mockClientCtx;
     private BookieClient mockBookieClient;
-    private BookKeeper mockBk;
     private LedgerHandle mockLh;
     private ScheduledExecutorService scheduler;
     private OrderedScheduler orderedScheduler;
-    private SpeculativeRequestExecutionPolicy speculativePolicy;
+    private ClientInternalConf internalConf;
+    private EnsemblePlacementPolicy mockPlacementPolicy;
     private LedgerMetadata ledgerMetadata;
     private DistributionSchedule distributionSchedule;
     private DigestManager digestManager;
@@ -89,11 +89,15 @@
     @Before
     public void setup() throws Exception {
         // stats
-        this.readLacAndEntryOpLogger = testStatsProvider
-            .getStatsLogger("").getOpStatsLogger("readLacAndEntry");
+        clientStats = BookKeeperClientStats.newInstance(testStatsProvider.getStatsLogger(""));
         // policy
-        this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy(
-            100, 200, 2);
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setFirstSpeculativeReadLACTimeout(100);
+        conf.setMaxSpeculativeReadLACTimeout(200);
+        conf.setSpeculativeReadLACTimeoutBackoffMultiplier(2);
+
+        internalConf = ClientInternalConf.fromConfig(conf);
+
         // metadata
         this.ledgerMetadata =
             new LedgerMetadata(3, 3, 2, DigestType.CRC32, new byte[0]);
@@ -111,16 +115,16 @@ public void setup() throws Exception {
             .build();
 
         this.mockBookieClient = mock(BookieClient.class);
-
-        this.mockBk = mock(BookKeeper.class);
-        when(mockBk.getReadLACSpeculativeRequestPolicy()).thenReturn(Optional.of(speculativePolicy));
-        when(mockBk.getBookieClient()).thenReturn(mockBookieClient);
-        when(mockBk.getReadLacAndEntryOpLogger()).thenReturn(readLacAndEntryOpLogger);
-        when(mockBk.getMainWorkerPool()).thenReturn(orderedScheduler);
-        EnsemblePlacementPolicy mockPlacementPolicy = mock(EnsemblePlacementPolicy.class);
-        when(mockBk.getPlacementPolicy()).thenReturn(mockPlacementPolicy);
+        this.mockPlacementPolicy = mock(EnsemblePlacementPolicy.class);
+        this.mockClientCtx = mock(ClientContext.class);
+        when(mockClientCtx.getBookieClient()).thenReturn(mockBookieClient);
+        when(mockClientCtx.getPlacementPolicy()).thenReturn(mockPlacementPolicy);
+        when(mockClientCtx.getConf()).thenReturn(internalConf);
+        when(mockClientCtx.getScheduler()).thenReturn(orderedScheduler);
+        when(mockClientCtx.getMainWorkerPool()).thenReturn(orderedScheduler);
+        when(mockClientCtx.getClientStats()).thenReturn(clientStats);
         this.mockLh = mock(LedgerHandle.class);
-        when(mockLh.getBk()).thenReturn(mockBk);
+
         when(mockLh.getId()).thenReturn(LEDGERID);
         when(mockLh.getLedgerMetadata()).thenReturn(ledgerMetadata);
         when(mockLh.getDistributionSchedule()).thenReturn(distributionSchedule);
@@ -195,12 +199,7 @@ public void testSpeculativeResponses() throws Exception {
         };
 
         ReadLastConfirmedAndEntryOp op = new ReadLastConfirmedAndEntryOp(
-            mockLh,
-            resultCallback,
-            1L,
-            10000,
-            scheduler
-        );
+                mockLh, mockClientCtx, resultCallback, 1L, 10000);
         op.initiate();
 
         // wait until all speculative requests are sent
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index b26798b824..9782ca7e13 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -244,8 +244,8 @@ public boolean isClosed() {
                 return true;
             }
         };
-        LedgerHandle lh = new LedgerHandle(bkc, 0, metadata, TEST_DIGEST_TYPE,
-                TEST_PSSWD, WriteFlag.NONE);
+        LedgerHandle lh = new LedgerHandle(bkc.getClientCtx(), 0, metadata, TEST_DIGEST_TYPE,
+                                           TEST_PSSWD, WriteFlag.NONE);
         testSplitIntoSubFragments(10, 21, -1, 1, lh);
         testSplitIntoSubFragments(10, 21, 20, 1, lh);
         testSplitIntoSubFragments(0, 0, 10, 1, lh);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
index 0da50b57cd..d3e810c13f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
@@ -43,7 +43,7 @@ public void testChangeEnsembleMaxNumWithWriter() throws Exception {
         long lId;
         int numEntries = 5;
         int changeNum = 5;
-        setBookkeeperConfig(new ClientConfiguration().setDelayEnsembleChange(false).setMaxAllowedEnsembleChanges(5));
+        setBookKeeperConfig(new ClientConfiguration().setDelayEnsembleChange(false).setMaxAllowedEnsembleChanges(5));
         try (WriteHandle writer = result(newCreateLedgerOp()
                 .withAckQuorumSize(3)
                 .withWriteQuorumSize(3)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index ba500fc8dc..68fd29c942 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -66,6 +66,14 @@ long getLedgerToRead(int ensemble, int writeQuorum, int ackQuorum, int numEntrie
         return lh.getId();
     }
 
+    PendingReadOp createReadOp(LedgerHandle lh, long from, long to) {
+        return new PendingReadOp(lh, bkc.getClientCtx(), from, to, false);
+    }
+
+    PendingReadOp createRecoveryReadOp(LedgerHandle lh, long from, long to) {
+        return new PendingReadOp(lh, bkc.getClientCtx(), from, to, true);
+    }
+
     @Test
     public void testNormalParallelRead() throws Exception {
         int numEntries = 10;
@@ -75,8 +83,7 @@ public void testNormalParallelRead() throws Exception {
 
         // read single entry
         for (int i = 0; i < numEntries; i++) {
-            PendingReadOp readOp =
-                    new PendingReadOp(lh, lh.bk.scheduler, i, i);
+            PendingReadOp readOp = createReadOp(lh, i, i);
             readOp.parallelRead(true).submit();
             Iterator<LedgerEntry> entries = readOp.future().get().iterator();
             assertTrue(entries.hasNext());
@@ -88,8 +95,7 @@ public void testNormalParallelRead() throws Exception {
         }
 
         // read multiple entries
-        PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+        PendingReadOp readOp = createReadOp(lh, 0, numEntries - 1);
         readOp.parallelRead(true).submit();
         Iterator<LedgerEntry> iterator = readOp.future().get().iterator();
 
@@ -125,13 +131,12 @@ public void testParallelReadMissingEntries() throws Exception {
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
         // read single entry
-        PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 11, 11);
+        PendingReadOp readOp = createReadOp(lh, 11, 11);
         readOp.parallelRead(true).submit();
         expectFail(readOp.future(), Code.NoSuchEntryException);
 
         // read multiple entries
-        readOp = new PendingReadOp(lh, lh.bk.scheduler, 8, 11);
+        readOp = createReadOp(lh, 8, 11);
         readOp.parallelRead(true).submit();
         expectFail(readOp.future(), Code.NoSuchEntryException);
 
@@ -158,8 +163,7 @@ public void testFailParallelRecoveryReadMissingEntryImmediately() throws Excepti
         sleepBookie(ensemble.get(0), latch1);
         sleepBookie(ensemble.get(1), latch2);
 
-        PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 10, 10, true);
+        PendingReadOp readOp = createRecoveryReadOp(lh, 10, 10);
         readOp.parallelRead(true).submit();
         // would fail immediately if found missing entries don't cover ack quorum
         expectFail(readOp.future(), Code.NoSuchEntryException);
@@ -189,8 +193,7 @@ public void testParallelReadWithFailedBookies() throws Exception {
         killBookie(ensemble.get(1));
 
         // read multiple entries
-        PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+        PendingReadOp readOp = createReadOp(lh, 0, numEntries - 1);
         readOp.parallelRead(true).submit();
         Iterator<LedgerEntry> entries = readOp.future().get().iterator();
 
@@ -227,8 +230,7 @@ public void testParallelReadFailureWithFailedBookies() throws Exception {
         killBookie(ensemble.get(2));
 
         // read multiple entries
-        PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+        PendingReadOp readOp = createReadOp(lh, 0, numEntries - 1);
         readOp.parallelRead(true).submit();
         expectFail(readOp.future(), Code.BookieHandleNotAvailableException);
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
index 49a435ec4e..e0f3dc69f3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
@@ -51,7 +51,8 @@ public void testPendingReadLacOpMissingExplicitLAC() throws Exception {
         lh.append(data);
 
         final CompletableFuture<Long> result = new CompletableFuture<>();
-        PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) -> result.complete(lac)) {
+        PendingReadLacOp pro = new PendingReadLacOp(lh, bkc.getBookieClient(),
+                                                    (rc, lac) -> result.complete(lac)) {
             @Override
             public void initiate() {
                 for (int i = 0; i < lh.getLedgerMetadata().currentEnsemble.size(); i++) {
@@ -70,7 +71,7 @@ public void initiate() {
                                 index);
 
                     }, 0, TimeUnit.SECONDS);
-                    lh.bk.getBookieClient().readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
+                    bookieClient.readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
                             lh.ledgerId, this, i);
                 }
             }
@@ -87,7 +88,7 @@ public void testPendingReadLacOpMissingLAC() throws Exception {
         lh.append(data);
 
         final CompletableFuture<Long> result = new CompletableFuture<>();
-        PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) -> result.complete(lac)) {
+        PendingReadLacOp pro = new PendingReadLacOp(lh, bkc.getBookieClient(), (rc, lac) -> result.complete(lac)) {
             @Override
             public void initiate() {
                 for (int i = 0; i < lh.getLedgerMetadata().currentEnsemble.size(); i++) {
@@ -101,7 +102,7 @@ public void initiate() {
                                 null,
                                 index);
                     }, 0, TimeUnit.SECONDS);
-                    lh.bk.getBookieClient().readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
+                    bookieClient.readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
                             lh.ledgerId, this, i);
                 }
             }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
index 66956b2188..7f28cc3e07 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
@@ -109,6 +109,10 @@ boolean isInOrder() {
         }
     }
 
+    ListenerBasedPendingReadOp createReadOp(LedgerHandle lh, long from, long to, ReadEntryListener listener) {
+        return new ListenerBasedPendingReadOp(lh, bkc.getClientCtx(), from, to, listener, null, false);
+    }
+
     void basicReadTest(boolean parallelRead) throws Exception {
         int numEntries = 10;
 
@@ -118,8 +122,7 @@ void basicReadTest(boolean parallelRead) throws Exception {
         // read single entry
         for (int i = 0; i < numEntries; i++) {
             LatchListener listener = new LatchListener(i, 1);
-            ListenerBasedPendingReadOp readOp =
-                    new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, i, i, listener, null);
+            ListenerBasedPendingReadOp readOp = createReadOp(lh, i, i, listener);
             readOp.parallelRead(parallelRead).submit();
             listener.expectComplete();
             assertEquals(1, listener.resultCodes.size());
@@ -132,8 +135,7 @@ void basicReadTest(boolean parallelRead) throws Exception {
 
         // read multiple entries
         LatchListener listener = new LatchListener(0L, numEntries);
-        ListenerBasedPendingReadOp readOp =
-                new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, listener, null);
+        ListenerBasedPendingReadOp readOp = createReadOp(lh, 0, numEntries - 1, listener);
         readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(numEntries, listener.resultCodes.size());
@@ -166,8 +168,7 @@ private void readMissingEntriesTest(boolean parallelRead) throws Exception {
 
         // read single entry
         LatchListener listener = new LatchListener(11L, 1);
-        ListenerBasedPendingReadOp readOp =
-                new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 11, 11, listener, null);
+        ListenerBasedPendingReadOp readOp = createReadOp(lh, 11, 11, listener);
         readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(1, listener.resultCodes.size());
@@ -178,7 +179,7 @@ private void readMissingEntriesTest(boolean parallelRead) throws Exception {
 
         // read multiple missing entries
         listener = new LatchListener(11L, 3);
-        readOp = new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 11, 13, listener, null);
+        readOp = createReadOp(lh, 11, 13, listener);
         readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(3, listener.resultCodes.size());
@@ -192,7 +193,7 @@ private void readMissingEntriesTest(boolean parallelRead) throws Exception {
 
         // read multiple entries with missing entries
         listener = new LatchListener(5L, 10);
-        readOp = new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 5L, 14L, listener, null);
+        readOp = createReadOp(lh, 5L, 14L, listener);
         readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(10, listener.resultCodes.size());
@@ -237,8 +238,7 @@ private void readWithFailedBookiesTest(boolean parallelRead) throws Exception {
 
         // read multiple entries
         LatchListener listener = new LatchListener(0L, numEntries);
-        ListenerBasedPendingReadOp readOp =
-                new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, listener, null);
+        ListenerBasedPendingReadOp readOp = createReadOp(lh, 0, numEntries - 1, listener);
         readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(numEntries, listener.resultCodes.size());
@@ -278,8 +278,7 @@ private void readFailureWithFailedBookiesTest(boolean parallelRead) throws Excep
 
         // read multiple entries
         LatchListener listener = new LatchListener(0L, numEntries);
-        ListenerBasedPendingReadOp readOp =
-            new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, listener, null);
+        ListenerBasedPendingReadOp readOp = createReadOp(lh, 0, numEntries - 1, listener);
         readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(numEntries, listener.resultCodes.size());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index aa2105916f..5d251a954a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -159,7 +159,7 @@ public void testSpeculativeRead() throws Exception {
             speccb.expectSuccess(4000);
             nospeccb.expectTimeout(4000);
             // Check that the second bookie is registered as slow at entryId 1
-            RackawareEnsemblePlacementPolicy rep = (RackawareEnsemblePlacementPolicy) lspec.bk.placementPolicy;
+            RackawareEnsemblePlacementPolicy rep = (RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy();
             assertTrue(rep.slowBookies.asMap().size() == 1);
 
             assertTrue(
@@ -220,7 +220,7 @@ public void testSpeculativeReadMultipleReplicasDown() throws Exception {
             Set<BookieSocketAddress> expectedSlowBookies = new HashSet<>();
             expectedSlowBookies.add(l.getLedgerMetadata().getEnsembles().get(0L).get(1));
             expectedSlowBookies.add(l.getLedgerMetadata().getEnsembles().get(0L).get(2));
-            assertEquals(((RackawareEnsemblePlacementPolicy) l.bk.placementPolicy).slowBookies.asMap().keySet(),
+            assertEquals(((RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy()).slowBookies.asMap().keySet(),
                 expectedSlowBookies);
 
             // third should not hit timeouts since bookies 1 & 2 are registered as slow
@@ -318,8 +318,7 @@ public void testSpeculativeReadScheduling() throws Exception {
         secondHostOnly.set(1, true);
         PendingReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null;
         try {
-            PendingReadOp op = new PendingReadOp(l, bkspec.scheduler, 0, 5);
-
+            PendingReadOp op = new PendingReadOp(l, bkspec.getClientCtx(), 0, 5, false);
             // if we've already heard from all hosts,
             // we only send the initial read
             req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
index 93fc4ec2ba..5429c0dba2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
@@ -212,7 +212,7 @@ public void testOpenLedgerDigestUnmatchedWhenAutoDetectionDisabled() throws Exce
     private void testOpenLedgerDigestUnmatched(boolean autodetection) throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
         conf.setEnableDigestTypeAutodetection(autodetection);
-        mockBookKeeperGetConf(conf);
+        setBookKeeperConfig(conf);
 
         long lId;
         try (WriteHandle writer = result(newCreateLedgerOp()
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
index b0385b1083..65ea441a86 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -152,7 +152,7 @@ public void testFailCustomMetadataNull() throws Exception {
     public void testFailDigestTypeNullAndAutodetectionTrue() throws Exception {
         ClientConfiguration config = new ClientConfiguration();
         config.setEnableDigestTypeAutodetection(true);
-        setBookkeeperConfig(config);
+        setBookKeeperConfig(config);
         result(newCreateLedgerOp()
             .withDigestType(null)
             .withPassword(password)
@@ -163,7 +163,7 @@ public void testFailDigestTypeNullAndAutodetectionTrue() throws Exception {
     public void testFailDigestTypeNullAndAutodetectionFalse() throws Exception {
         ClientConfiguration config = new ClientConfiguration();
         config.setEnableDigestTypeAutodetection(false);
-        setBookkeeperConfig(config);
+        setBookKeeperConfig(config);
         result(newCreateLedgerOp()
             .withDigestType(null)
             .withPassword(password)
diff --git a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index 88a1344883..ffe2d88e46 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -33,7 +33,6 @@
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -77,10 +76,10 @@ public InetSocketAddress getBookieAddress() {
         }
     }
 
-    private final BookieClient bookieClient;
+    private final ClientContext clientCtx;
 
     public LedgerReader(BookKeeper bkc) {
-        bookieClient = bkc.getBookieClient();
+        clientCtx = bkc.getClientCtx();
     }
 
     public static SortedMap<Long, ? extends List<BookieSocketAddress>> bookiesForLedger(final LedgerHandle lh) {
@@ -123,7 +122,7 @@ public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object
         List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(eid);
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
-            bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid, readEntryCallback,
+            clientCtx.getBookieClient().readEntry(ensemble.get(idx), lh.getId(), eid, readEntryCallback,
                                    ensemble.get(idx), BookieProtocol.FLAG_NONE);
         }
     }
@@ -143,7 +142,7 @@ public void forwardReadEntriesFromLastConfirmed(final LedgerHandle lh,
         final FutureEventListener<LedgerEntries> readListener = new FutureEventListener<LedgerEntries>() {
 
             private void readNext(long entryId) {
-                PendingReadOp op = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, false);
+                PendingReadOp op = new PendingReadOp(lh, clientCtx, entryId, entryId, false);
                 op.future().whenComplete(this);
                 op.submit();
             }
@@ -193,12 +192,12 @@ public void onFailure(Throwable throwable) {
             }
 
             long entryId = recoveryData.getLastAddConfirmed();
-            PendingReadOp op = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, false);
+            PendingReadOp op = new PendingReadOp(lh, clientCtx, entryId, entryId, false);
             op.future().whenComplete(readListener);
             op.submit();
         };
         // Read Last AddConfirmed
-        new ReadLastConfirmedOp(lh, readLACCallback).initiate();
+        new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(), readLACCallback).initiate();
     }
 
     public void readLacs(final LedgerHandle lh, long eid,
@@ -228,7 +227,7 @@ public void readLacs(final LedgerHandle lh, long eid,
         List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(eid);
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
-            bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid, readEntryCallback,
+            clientCtx.getBookieClient().readEntry(ensemble.get(idx), lh.getId(), eid, readEntryCallback,
                                    ensemble.get(idx), BookieProtocol.FLAG_NONE);
         }
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services