You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2018/04/27 03:12:50 UTC

[bookkeeper] branch master updated: ISSUE #1086 (@bug W-4146427@) Client-side backpressure in netty (Fixes: io.netty.util.internal.OutOfDirectMemoryError under continuous heavy load) (#1088)

This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c3e8c9  ISSUE #1086 (@bug W-4146427@) Client-side backpressure in netty (Fixes: io.netty.util.internal.OutOfDirectMemoryError under continuous heavy load) (#1088)
9c3e8c9 is described below

commit 9c3e8c9f835781507ac833473abc39e821bdbf36
Author: Andrey Yegorov <dl...@users.noreply.github.com>
AuthorDate: Thu Apr 26 20:12:47 2018 -0700

    ISSUE #1086 (@bug W-4146427@) Client-side backpressure in netty (Fixes: io.netty.util.internal.OutOfDirectMemoryError under continuous heavy load) (#1088)
    
    * ISSUE #1086 (@bug W-4146427@) Client-side backpressure in netty (Fixes: io.netty.util.internal.OutOfDirectMemoryError under continuous heavy load)
---
 .../bookkeeper/client/BookKeeperClientStats.java   |   1 +
 .../org/apache/bookkeeper/client/LedgerHandle.java | 120 +++++++++++++++++
 .../apache/bookkeeper/client/LedgerHandleAdv.java  |   5 +
 .../org/apache/bookkeeper/client/PendingAddOp.java |  11 +-
 .../apache/bookkeeper/client/PendingReadOp.java    |   8 ++
 .../bookkeeper/conf/ClientConfiguration.java       |  33 ++++-
 .../org/apache/bookkeeper/proto/BookieClient.java  |  35 ++++-
 .../proto/DefaultPerChannelBookieClientPool.java   |  18 ++-
 .../bookkeeper/proto/PerChannelBookieClient.java   |  79 +++++++++--
 .../proto/PerChannelBookieClientPool.java          |  11 ++
 .../bookkeeper/client/MockBookKeeperTestCase.java  |  38 ++++--
 .../apache/bookkeeper/client/SlowBookieTest.java   | 150 ++++++++++++++++++++-
 .../proto/TestPerChannelBookieClient.java          |   3 +-
 13 files changed, 478 insertions(+), 34 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 7d0fd5d..83e6421 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -71,6 +71,7 @@ public interface BookKeeperClientStats {
     String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
 
     String NETTY_EXCEPTION_CNT = "NETTY_EXCEPTION_CNT";
+    String CLIENT_CHANNEL_WRITE_WAIT = "CLIENT_CHANNEL_WRITE_WAIT";
     String CLIENT_CONNECT_TIMER = "CLIENT_CONNECT_TIMER";
     String ADD_OP_OUTSTANDING = "ADD_OP_OUTSTANDING";
     String READ_OP_OUTSTANDING = "READ_OP_OUTSTANDING";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 3c53a03..f72ab85 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -83,6 +84,7 @@ import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.proto.checksum.MacDigestManager;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.commons.collections4.IteratorUtils;
@@ -114,6 +116,8 @@ public class LedgerHandle implements WriteHandle {
     final EnumSet<WriteFlag> writeFlags;
     ScheduledFuture<?> timeoutFuture = null;
 
+    final long waitForWriteSetMs;
+
     /**
      * Invalid entry id. This value is returned from methods which
      * should return an entry id but there is no valid entry available.
@@ -135,6 +139,7 @@ public class LedgerHandle implements WriteHandle {
     final Counter ensembleChangeCounter;
     final Counter lacUpdateHitsCounter;
     final Counter lacUpdateMissesCounter;
+    private final OpStatsLogger clientChannelWriteWaitStats;
 
     // This empty master key is used when an empty password is provided which is the hash of an empty string
     private static final byte[] emptyLedgerKey;
@@ -155,6 +160,7 @@ public class LedgerHandle implements WriteHandle {
         this.enableParallelRecoveryRead = bk.getConf().getEnableParallelRecoveryRead();
         this.recoveryReadBatchSize = bk.getConf().getRecoveryReadBatchSize();
         this.writeFlags = writeFlags;
+        this.waitForWriteSetMs = bk.getConf().getWaitTimeoutOnBackpressureMillis();
 
         if (metadata.isClosed()) {
             lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
@@ -206,6 +212,8 @@ public class LedgerHandle implements WriteHandle {
         ensembleChangeCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
         lacUpdateHitsCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
         lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES);
+        clientChannelWriteWaitStats = bk.getStatsLogger()
+                .getOpStatsLogger(BookKeeperClientStats.CLIENT_CHANNEL_WRITE_WAIT);
         bk.getStatsLogger().registerGauge(BookKeeperClientStats.PENDING_ADDS,
                                           new Gauge<Integer>() {
                                               @Override
@@ -842,6 +850,27 @@ public class LedgerHandle implements WriteHandle {
                                                               boolean isRecoveryRead) {
         PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), firstEntry, lastEntry, isRecoveryRead);
         if (!bk.isClosed()) {
+            // Waiting on the first one.
+            // This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive
+            // state later after N requests sent.
+            // Unfortunately it seems that alternatives are:
+            // - send reads one-by-one (up to the app)
+            // - rework LedgerHandle to send requests one-by-one (maybe later, potential perf impact)
+            // - block worker pool (not good)
+            // Even with this implementation one should be more concerned about OOME when all read responses arrive
+            // or about overloading bookies with these requests then about submission of many small requests.
+            // Naturally one of the solutions would be to submit smaller batches and in this case
+            // current implementation will prevent next batch from starting when bookie is
+            // unresponsive thus helpful enough.
+            DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(firstEntry);
+            try {
+                if (!waitForWritable(ws, firstEntry, ws.size() - 1, waitForWriteSetMs)) {
+                    op.allowFailFastOnUnwritableChannel();
+                }
+            } finally {
+                ws.recycle();
+            }
+
             bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } else {
             op.future().completeExceptionally(BKException.create(ClientClosedException));
@@ -1066,6 +1095,86 @@ public class LedgerHandle implements WriteHandle {
         doAsyncAddEntry(op);
     }
 
+    private boolean isWritesetWritable(DistributionSchedule.WriteSet writeSet,
+                                       long key, int allowedNonWritableCount) {
+        if (allowedNonWritableCount < 0) {
+            allowedNonWritableCount = 0;
+        }
+
+        final int sz = writeSet.size();
+        final int requiredWritable = sz - allowedNonWritableCount;
+
+        int nonWritableCount = 0;
+        for (int i = 0; i < sz; i++) {
+            if (!bk.getBookieClient().isWritable(metadata.currentEnsemble.get(i), key)) {
+                nonWritableCount++;
+                if (nonWritableCount >= allowedNonWritableCount) {
+                    return false;
+                }
+            } else {
+                final int knownWritable = i - nonWritableCount;
+                if (knownWritable >= requiredWritable) {
+                    return true;
+                }
+            }
+        }
+        return true;
+    }
+
+    protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, long key,
+                                    int allowedNonWritableCount, long durationMs) {
+        if (durationMs < 0) {
+            return true;
+        }
+
+        final long startTime = MathUtils.nowInNano();
+        boolean success = isWritesetWritable(writeSet, key, allowedNonWritableCount);
+
+        if (!success && durationMs > 0) {
+            int backoff = 1;
+            final int maxBackoff = 4;
+            final long deadline = startTime + TimeUnit.MILLISECONDS.toNanos(durationMs);
+
+            while (!isWritesetWritable(writeSet, key, allowedNonWritableCount)) {
+                if (MathUtils.nowInNano() < deadline) {
+                    long maxSleep = MathUtils.elapsedMSec(startTime);
+                    if (maxSleep < 0) {
+                        maxSleep = 1;
+                    }
+                    long sleepMs = Math.min(backoff, maxSleep);
+
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(sleepMs);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        success = isWritesetWritable(writeSet, key, allowedNonWritableCount);
+                        break;
+                    }
+                    if (backoff <= maxBackoff) {
+                        backoff++;
+                    }
+                } else {
+                    success = false;
+                    break;
+                }
+            }
+            if (backoff > 1) {
+                LOG.info("Spent {} ms waiting for {} writable channels",
+                        MathUtils.elapsedMSec(startTime),
+                        writeSet.size() - allowedNonWritableCount);
+            }
+        }
+
+        if (success) {
+            clientChannelWriteWaitStats.registerSuccessfulEvent(
+                    MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+        } else {
+            clientChannelWriteWaitStats.registerFailedEvent(
+                    MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+        }
+        return success;
+    }
+
     protected void doAsyncAddEntry(final PendingAddOp op) {
         if (throttler != null) {
             throttler.acquire();
@@ -1110,6 +1219,15 @@ public class LedgerHandle implements WriteHandle {
             return;
         }
 
+        DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
+        try {
+            if (!waitForWritable(ws, op.getEntryId(), 0, waitForWriteSetMs)) {
+                op.allowFailFastOnUnwritableChannel();
+            }
+        } finally {
+            ws.recycle();
+        }
+
         try {
             bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
@@ -1155,6 +1273,7 @@ public class LedgerHandle implements WriteHandle {
             cb.readLastConfirmedComplete(BKException.Code.OK, lastEntryId, ctx);
             return;
         }
+
         ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback() {
                 @Override
                 public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
@@ -1166,6 +1285,7 @@ public class LedgerHandle implements WriteHandle {
                     }
                 }
             };
+
         new ReadLastConfirmedOp(this, innercb).initiate();
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 0eaf0b5..f67e1d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -237,6 +237,11 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
             return;
         }
 
+        if (!waitForWritable(distributionSchedule.getWriteSet(op.getEntryId()),
+                op.getEntryId(), 0, waitForWriteSetMs)) {
+            op.allowFailFastOnUnwritableChannel();
+        }
+
         try {
             bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index eb95766..61037f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -80,6 +80,8 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
     boolean callbackTriggered;
     boolean hasRun;
 
+    boolean allowFailFast = false;
+
     static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallbackWithLatency cb, Object ctx) {
         PendingAddOp op = RECYCLER.get();
         op.lh = lh;
@@ -100,6 +102,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         op.callbackTriggered = false;
         op.hasRun = false;
         op.requestTimeNanos = Long.MAX_VALUE;
+        op.allowFailFast = false;
         op.qwcLatency = 0;
         return op;
     }
@@ -113,6 +116,11 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         return this;
     }
 
+    PendingAddOp allowFailFastOnUnwritableChannel() {
+        allowFailFast = true;
+        return this;
+    }
+
     void setEntryId(long entryId) {
         this.entryId = entryId;
     }
@@ -129,7 +137,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
 
         lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey,
-                entryId, toSend, this, bookieIndex, flags);
+                entryId, toSend, this, bookieIndex, flags, allowFailFast);
         ++pendingWriteRequests;
     }
 
@@ -444,6 +452,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         pendingWriteRequests = 0;
         callbackTriggered = false;
         hasRun = false;
+        allowFailFast = false;
 
         recyclerHandle.recycle(this);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index a41207d..3852078 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -81,6 +81,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
 
     boolean parallelRead = false;
     final AtomicBoolean complete = new AtomicBoolean(false);
+    boolean allowFailFast = false;
 
     abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, AutoCloseable {
 
@@ -391,6 +392,8 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
                 return null;
             }
 
+            // ToDo: pick replica with writable PCBC. ISSUE #1239
+            // https://github.com/apache/bookkeeper/issues/1239
             int replica = nextReplicaIndexToReadFrom;
             int bookieIndex = writeSet.get(nextReplicaIndexToReadFrom);
             nextReplicaIndexToReadFrom++;
@@ -473,6 +476,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
         this.endEntryId = endEntryId;
         this.scheduler = scheduler;
         this.isRecoveryRead = isRecoveryRead;
+        this.allowFailFast = false;
         numPendingEntries = endEntryId - startEntryId + 1;
         requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize()
                 - getLedgerMetadata().getAckQuorumSize() + 1;
@@ -504,6 +508,10 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
         return this;
     }
 
+    void allowFailFastOnUnwritableChannel() {
+        allowFailFast = true;
+    }
+
     public void submit() {
         lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 64fbbe6..7b64d1b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -128,6 +128,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     protected static final String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";
     protected static final String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs";
     protected static final String TIMEOUT_TIMER_NUM_TICKS = "timeoutTimerNumTicks";
+    protected static final String WAIT_TIMEOUT_ON_BACKPRESSURE = "waitTimeoutOnBackpressureMs";
 
     // Bookie health check settings
     protected static final String BOOKIE_HEALTH_CHECK_ENABLED = "bookieHealthCheckEnabled";
@@ -418,7 +419,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
      * @return netty channel write buffer low water mark.
      */
     public int getClientWriteBufferLowWaterMark() {
-        return getInt(CLIENT_WRITEBUFFER_LOW_WATER_MARK, 32 * 1024);
+        return getInt(CLIENT_WRITEBUFFER_LOW_WATER_MARK, 384 * 1024);
     }
 
     /**
@@ -439,7 +440,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
      * @return netty channel write buffer high water mark.
      */
     public int getClientWriteBufferHighWaterMark() {
-        return getInt(CLIENT_WRITEBUFFER_HIGH_WATER_MARK, 64 * 1024);
+        return getInt(CLIENT_WRITEBUFFER_HIGH_WATER_MARK, 512 * 1024);
     }
 
     /**
@@ -800,6 +801,34 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     }
 
     /**
+     * Timeout controlling wait on request send in case of unresponsive bookie(s)
+     * (i.e. bookie in long GC etc.)
+     *
+     * @return timeout value
+     *        negative value disables the feature
+     *        0 to allow request to fail immediately
+     *        Default is -1 (disabled)
+     */
+    public long getWaitTimeoutOnBackpressureMillis() {
+        return getLong(WAIT_TIMEOUT_ON_BACKPRESSURE, -1);
+    }
+
+    /**
+     * Timeout controlling wait on request send in case of unresponsive bookie(s)
+     * (i.e. bookie in long GC etc.)
+     *
+     * @param value
+     *        negative value disables the feature
+     *        0 to allow request to fail immediately
+     *        Default is -1 (disabled)
+     * @return client configuration.
+     */
+    public ClientConfiguration setWaitTimeoutOnBackpressureMillis(long value) {
+        setProperty(WAIT_TIMEOUT_ON_BACKPRESSURE, value);
+        return this;
+    }
+
+    /**
      * Get the number of worker threads. This is the number of
      * worker threads used by bookkeeper client to submit operations.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index c160b20..31c12fc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -149,6 +149,12 @@ public class BookieClient implements PerChannelBookieClientFactory {
         return faultyBookies;
     }
 
+    public boolean isWritable(BookieSocketAddress address, long key) {
+        final PerChannelBookieClientPool pcbcPool = lookupClient(address);
+        // if null, let the write initiate connect of fail with whatever error it produces
+        return pcbcPool == null || pcbcPool.isWritable(key);
+    }
+
     @Override
     public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
             SecurityHandlerFactory shFactory) throws SecurityException {
@@ -242,6 +248,18 @@ public class BookieClient implements PerChannelBookieClientFactory {
                          final WriteCallback cb,
                          final Object ctx,
                          final int options) {
+        addEntry(addr, ledgerId, masterKey, entryId, toSend, cb, ctx, options, false);
+    }
+
+    public void addEntry(final BookieSocketAddress addr,
+                         final long ledgerId,
+                         final byte[] masterKey,
+                         final long entryId,
+                         final ByteBufList toSend,
+                         final WriteCallback cb,
+                         final Object ctx,
+                         final int options,
+                         final boolean allowFastFail) {
         final PerChannelBookieClientPool client = lookupClient(addr);
         if (client == null) {
             completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
@@ -255,7 +273,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
 
         client.obtain(ChannelReadyForAddEntryCallback.create(
                               this, toSend, ledgerId, entryId, addr,
-                              ctx, cb, options, masterKey),
+                                  ctx, cb, options, masterKey, allowFastFail),
                       ledgerId);
     }
 
@@ -291,11 +309,12 @@ public class BookieClient implements PerChannelBookieClientFactory {
         private WriteCallback cb;
         private int options;
         private byte[] masterKey;
+        private boolean allowFastFail;
 
         static ChannelReadyForAddEntryCallback create(
                 BookieClient bookieClient, ByteBufList toSend, long ledgerId,
                 long entryId, BookieSocketAddress addr, Object ctx,
-                WriteCallback cb, int options, byte[] masterKey) {
+                WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail) {
             ChannelReadyForAddEntryCallback callback = RECYCLER.get();
             callback.bookieClient = bookieClient;
             callback.toSend = toSend;
@@ -306,6 +325,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
             callback.cb = cb;
             callback.options = options;
             callback.masterKey = masterKey;
+            callback.allowFastFail = allowFastFail;
             return callback;
         }
 
@@ -316,7 +336,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                 bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
             } else {
                 pcbc.addEntry(ledgerId, masterKey, entryId,
-                              toSend, cb, ctx, options);
+                              toSend, cb, ctx, options, allowFastFail);
             }
 
             toSend.release();
@@ -346,6 +366,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
             cb = null;
             options = -1;
             masterKey = null;
+            allowFastFail = false;
 
             recyclerHandle.recycle(this);
         }
@@ -382,6 +403,12 @@ public class BookieClient implements PerChannelBookieClientFactory {
 
     public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
                           final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) {
+        readEntry(addr, ledgerId, entryId, cb, ctx, flags, masterKey, false);
+    }
+
+    public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
+                          final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey,
+                          final boolean allowFastFail) {
         final PerChannelBookieClientPool client = lookupClient(addr);
         if (client == null) {
             cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
@@ -393,7 +420,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
             if (rc != BKException.Code.OK) {
                 completeRead(rc, ledgerId, entryId, null, cb, ctx);
             } else {
-                pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey);
+                pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey, allowFastFail);
             }
         }, ledgerId);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index e57b8a7..04471d5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -84,14 +84,22 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool,
         }
     }
 
-    @Override
-    public void obtain(GenericCallback<PerChannelBookieClient> callback, long key) {
+    private PerChannelBookieClient getClient(long key) {
         if (1 == clients.length) {
-            clients[0].connectIfNeededAndDoOp(callback);
-            return;
+            return clients[0];
         }
         int idx = MathUtils.signSafeMod(key, clients.length);
-        clients[idx].connectIfNeededAndDoOp(callback);
+        return clients[idx];
+    }
+
+    @Override
+    public void obtain(GenericCallback<PerChannelBookieClient> callback, long key) {
+        getClient(key).connectIfNeededAndDoOp(callback);
+    }
+
+    @Override
+    public boolean isWritable(long key) {
+        return getClient(key).isWritable();
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 10fc253..f6ebbfa 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -214,6 +214,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
     private final ClientAuthProvider.Factory authProviderFactory;
     private final ExtensionRegistry extRegistry;
     private final SecurityHandlerFactory shFactory;
+    private volatile boolean isWritable = true;
 
     public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
                                   BookieSocketAddress addr) throws SecurityException {
@@ -337,7 +338,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             public void disconnect() {
                 Channel c = channel;
                 if (c != null) {
-                    c.close();
+                    c.close().addListener(x -> makeWritable());
                 }
                 LOG.info("authplugin disconnected channel {}", channel);
             }
@@ -457,7 +458,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
         ChannelFuture future = bootstrap.connect(bookieAddr);
         future.addListener(new ConnectionFutureListener(startTime));
-
+        future.addListener(x -> makeWritable());
         return future;
     }
 
@@ -466,6 +467,22 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         close();
     }
 
+    /**
+     *
+     * @return boolean, true is PCBC is writable
+     */
+    public boolean isWritable() {
+        return isWritable;
+    }
+
+    public void setWritable(boolean val) {
+        isWritable = val;
+    }
+
+    private void makeWritable() {
+        setWritable(true);
+    }
+
     void connectIfNeededAndDoOp(GenericCallback<PerChannelBookieClient> op) {
         boolean completeOpNow = false;
         int opRc = BKException.Code.OK;
@@ -567,6 +584,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
                   Object ctx, final int options) {
+        addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options, false);
+    }
+
+    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
+                  Object ctx, final int options, boolean allowFastFail) {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
@@ -623,7 +645,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             toSend.release();
             return;
         } else {
-            writeAndFlush(c, completionKey, request);
+            // addEntry times out on backpressure
+            writeAndFlush(c, completionKey, request, allowFastFail);
         }
     }
 
@@ -667,7 +690,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                                           ReadEntryCallback cb,
                                           Object ctx) {
         readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis,
-                          piggyBackEntry, cb, ctx, (short) 0, null);
+                          piggyBackEntry, cb, ctx, (short) 0, null, false);
     }
 
     /**
@@ -678,9 +701,10 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                           ReadEntryCallback cb,
                           Object ctx,
                           int flags,
-                          byte[] masterKey) {
+                          byte[] masterKey,
+                          boolean allowFastFail) {
         readEntryInternal(ledgerId, entryId, null, null, false,
-                          cb, ctx, (short) flags, masterKey);
+                          cb, ctx, (short) flags, masterKey, allowFastFail);
     }
 
     private void readEntryInternal(final long ledgerId,
@@ -691,7 +715,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                                    final ReadEntryCallback cb,
                                    final Object ctx,
                                    int flags,
-                                   byte[] masterKey) {
+                                   byte[] masterKey,
+                                   boolean allowFastFail) {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
@@ -765,7 +790,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             }
         }
 
-        writeAndFlush(channel, completionKey, request);
+        writeAndFlush(channel, completionKey, request, allowFastFail);
     }
 
     public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) {
@@ -867,6 +892,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             }
             toClose = channel;
             channel = null;
+            makeWritable();
         }
         if (toClose != null) {
             ChannelFuture cf = closeChannel(toClose);
@@ -880,18 +906,48 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Closing channel {}", c);
         }
+        return c.close().addListener(x -> makeWritable());
+    }
 
-        return c.close();
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+        final Channel c = channel;
+        if (c == null || c.isWritable()) {
+            makeWritable();
+        }
+        super.channelWritabilityChanged(ctx);
     }
 
     private void writeAndFlush(final Channel channel,
                                final CompletionKey key,
                                final Object request) {
+        writeAndFlush(channel, key, request, false);
+    }
+
+    private void writeAndFlush(final Channel channel,
+                           final CompletionKey key,
+                           final Object request,
+                           final boolean allowFastFail) {
         if (channel == null) {
+            LOG.warn("Operation {} failed: channel == null", requestToString(request));
             errorOut(key);
             return;
         }
 
+        final boolean isChannelWritable = channel.isWritable();
+        if (isWritable != isChannelWritable) {
+            // isWritable is volatile so simple "isWritable = channel.isWritable()" would be slower
+            isWritable = isChannelWritable;
+        }
+
+        if (allowFastFail && !isWritable) {
+            LOG.warn("Operation {} failed: TooManyRequestsException",
+                    requestToString(request));
+
+            errorOut(key, BKException.Code.TooManyRequestsException);
+            return;
+        }
+
         try {
             final long startTime = MathUtils.nowInNano();
 
@@ -1321,6 +1377,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                         pendingOps = new ArrayDeque<>();
                     }
 
+                    makeWritable();
+
                     for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
                         pendingOp.operationComplete(rc, PerChannelBookieClient.this);
                     }
@@ -2035,6 +2093,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     rc = BKException.Code.OK;
                     channel = future.channel();
                     if (shFactory != null) {
+                        makeWritable();
                         initiateTLS();
                         return;
                     } else {
@@ -2088,6 +2147,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
                 pendingOp.operationComplete(rc, PerChannelBookieClient.this);
             }
+
+            makeWritable();
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index 97a6a26..aa7a5e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -41,6 +41,17 @@ public interface PerChannelBookieClientPool {
     void obtain(GenericCallback<PerChannelBookieClient> callback, long key);
 
     /**
+     * Returns status of a client.
+     * It is suggested to delay/throttle requests to this channel if isWritable is false.
+     *
+     * @param key
+     * @return
+     */
+    default boolean isWritable(long key) {
+        return true;
+    }
+
+    /**
      * record any read/write error on {@link PerChannelBookieClientPool}.
      */
     void recordError();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index f7ff436..433591c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.client;
 
 import static org.apache.bookkeeper.client.api.BKException.Code.NoBookieAvailableException;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -68,6 +69,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.Stubber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -159,6 +161,7 @@ public abstract class MockBookKeeperTestCase {
         when(bk.getLedgerManager()).thenReturn(ledgerManager);
         when(bk.getLedgerIdGenerator()).thenReturn(ledgerIdGenerator);
         when(bk.getReturnRc(anyInt())).thenAnswer(invocationOnMock -> invocationOnMock.getArgument(0));
+        when(bookieClient.isWritable(any(), anyLong())).thenReturn(true);
 
         setupLedgerIdGenerator();
         setupCreateLedgerMetadata();
@@ -186,6 +189,7 @@ public abstract class MockBookKeeperTestCase {
         when(bk.getCreateOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
         when(bk.getRecoverAddCountLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
         when(bk.getRecoverReadCountLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        when(bk.getAddOpUrCounter()).thenReturn(nullStatsLogger.getCounter("mock"));
         return nullStatsLogger;
     }
 
@@ -396,7 +400,7 @@ public abstract class MockBookKeeperTestCase {
 
     @SuppressWarnings("unchecked")
     protected void setupBookieClientReadEntry() {
-        Answer<Void> answer = invokation -> {
+        final Stubber stub = doAnswer(invokation -> {
             Object[] args = invokation.getArguments();
             BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
             long ledgerId = (Long) args[1];
@@ -439,13 +443,19 @@ public abstract class MockBookKeeperTestCase {
                 }
             });
             return null;
-        };
-        doAnswer(answer).when(bookieClient).readEntry(any(), anyLong(), anyLong(),
+        });
+
+        stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(),
                 any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
                 any(), anyInt());
-        doAnswer(answer).when(bookieClient).readEntry(any(), anyLong(), anyLong(),
+
+        stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(),
                 any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
                 any(), anyInt(), any());
+
+        stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(),
+                any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
+                any(), anyInt(), any(), anyBoolean());
     }
 
     private byte[] extractEntryPayload(long ledgerId, long entryId, ByteBufList toSend)
@@ -468,7 +478,7 @@ public abstract class MockBookKeeperTestCase {
 
     @SuppressWarnings("unchecked")
     protected void setupBookieClientAddEntry() {
-        doAnswer(invokation -> {
+        final Stubber stub = doAnswer(invokation -> {
             Object[] args = invokation.getArguments();
             BookkeeperInternalCallbacks.WriteCallback callback = (BookkeeperInternalCallbacks.WriteCallback) args[5];
             BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
@@ -506,11 +516,19 @@ public abstract class MockBookKeeperTestCase {
                 }
             });
             return null;
-        }).when(bookieClient).addEntry(any(BookieSocketAddress.class),
-            anyLong(), any(byte[].class),
-            anyLong(), any(ByteBufList.class),
-            any(BookkeeperInternalCallbacks.WriteCallback.class),
-            any(), anyInt());
+        });
+
+        stub.when(bookieClient).addEntry(any(BookieSocketAddress.class),
+                anyLong(), any(byte[].class),
+                anyLong(), any(ByteBufList.class),
+                any(BookkeeperInternalCallbacks.WriteCallback.class),
+                any(), anyInt());
+
+        stub.when(bookieClient).addEntry(any(BookieSocketAddress.class),
+                anyLong(), any(byte[].class),
+                anyLong(), any(ByteBufList.class),
+                any(BookkeeperInternalCallbacks.WriteCallback.class),
+                any(), anyInt(), anyBoolean());
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 9cf22f3..7c6f9a0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.List;
 import java.util.Set;
@@ -46,6 +47,8 @@ import org.slf4j.LoggerFactory;
 public class SlowBookieTest extends BookKeeperClusterTestCase {
     private static final Logger LOG = LoggerFactory.getLogger(SlowBookieTest.class);
 
+    final byte[] entry = "Test Entry".getBytes();
+
     public SlowBookieTest() {
         super(4);
         baseConf.setNumAddWorkerThreads(0);
@@ -111,7 +114,6 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
         final LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, pwd);
         final AtomicBoolean finished = new AtomicBoolean(false);
         final AtomicBoolean failTest = new AtomicBoolean(false);
-        final byte[] entry = "Test Entry".getBytes();
         Thread t = new Thread() {
                 public void run() {
                     try {
@@ -126,10 +128,13 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
             };
         t.start();
         final CountDownLatch b0latch = new CountDownLatch(1);
+
         startNewBookie();
         sleepBookie(getBookie(0), b0latch);
+
         Thread.sleep(10000);
         b0latch.countDown();
+
         finished.set(true);
         t.join();
 
@@ -155,6 +160,148 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
     }
 
     @Test
+    public void testSlowBookieAndBackpressureOn() throws Exception {
+        final ClientConfiguration conf = new ClientConfiguration();
+        conf.setReadTimeout(5)
+                .setAddEntryTimeout(1)
+                .setAddEntryQuorumTimeout(1)
+                .setNumChannelsPerBookie(1)
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setClientWriteBufferLowWaterMark(1)
+                .setClientWriteBufferHighWaterMark(entry.length - 1)
+                .setWaitTimeoutOnBackpressureMillis(5000);
+
+        final boolean expectWriteError = false;
+        final boolean expectFailedTest = false;
+
+        LedgerHandle lh = doBackpressureTest(entry, conf, expectWriteError, expectFailedTest, 2000);
+        assertTrue(lh.readLastConfirmed() < 5);
+    }
+
+    @Test
+    public void testSlowBookieAndFastFailOn() throws Exception {
+        final ClientConfiguration conf = new ClientConfiguration();
+        conf.setReadTimeout(5)
+                .setAddEntryTimeout(1)
+                .setAddEntryQuorumTimeout(1)
+                .setNumChannelsPerBookie(1)
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setClientWriteBufferLowWaterMark(1)
+                .setClientWriteBufferHighWaterMark(2)
+                .setWaitTimeoutOnBackpressureMillis(0);
+
+        final boolean expectWriteError = true;
+        final boolean expectFailedTest = false;
+
+        LedgerHandle lh = doBackpressureTest(entry, conf, expectWriteError, expectFailedTest, 1000);
+        assertTrue(lh.readLastConfirmed() < 5);
+    }
+
+    @Test
+    public void testSlowBookieAndNoBackpressure() throws Exception {
+        final ClientConfiguration conf = new ClientConfiguration();
+        conf.setReadTimeout(5)
+                .setAddEntryTimeout(1)
+                .setAddEntryQuorumTimeout(1)
+                .setNumChannelsPerBookie(1)
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setClientWriteBufferLowWaterMark(1)
+                .setClientWriteBufferHighWaterMark(entry.length - 1)
+                .setWaitTimeoutOnBackpressureMillis(-1);
+
+        final boolean expectWriteError = false;
+        final boolean expectFailedTest = false;
+
+        LedgerHandle lh = doBackpressureTest(entry, conf, expectWriteError, expectFailedTest, 4000);
+
+        assertTrue(lh.readLastConfirmed() > 90);
+    }
+
+    private LedgerHandle doBackpressureTest(byte[] entry, ClientConfiguration conf,
+                                    boolean expectWriteError, boolean expectFailedTest,
+                                    long sleepInMillis) throws Exception {
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] pwd = new byte[] {};
+        final LedgerHandle lh = bkc.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, pwd);
+        lh.addEntry(entry);
+
+        final AtomicBoolean finished = new AtomicBoolean(false);
+        final AtomicBoolean failTest = new AtomicBoolean(false);
+        final AtomicBoolean writeError = new AtomicBoolean(false);
+        Thread t = new Thread(() -> {
+            try {
+                int count = 0;
+                while (!finished.get()) {
+                    lh.asyncAddEntry(entry, (rc, lh1, entryId, ctx) -> {
+                        if (rc != BKException.Code.OK) {
+                            finished.set(true);
+                            writeError.set(true);
+                        }
+                    }, null);
+                    if (++count > 100) {
+                        finished.set(true);
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error("Exception in add entry thread", e);
+                failTest.set(true);
+            }
+        });
+        final CountDownLatch b0latch = new CountDownLatch(1);
+        final CountDownLatch b0latch2 = new CountDownLatch(1);
+
+
+        sleepBookie(getBookie(0), b0latch);
+        sleepBookie(getBookie(1), b0latch2);
+
+        setTargetChannelState(bkc, getBookie(0), 0, false);
+        setTargetChannelState(bkc, getBookie(1), 0, false);
+
+        t.start();
+
+        Thread.sleep(sleepInMillis);
+
+        finished.set(true);
+
+        b0latch.countDown();
+        b0latch2.countDown();
+        setTargetChannelState(bkc, getBookie(0), 0, true);
+        setTargetChannelState(bkc, getBookie(1), 0, true);
+
+        t.join();
+
+        assertEquals("write error", expectWriteError, writeError.get());
+        assertEquals("test failure", expectFailedTest, failTest.get());
+
+        lh.close();
+
+        LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
+        LedgerChecker lc = new LedgerChecker(bkc);
+        final CountDownLatch checklatch = new CountDownLatch(1);
+        final AtomicInteger numFragments = new AtomicInteger(-1);
+        lc.checkLedger(lh2, (rc, fragments) -> {
+            LOG.debug("Checked ledgers returned {} {}", rc, fragments);
+            if (rc == BKException.Code.OK) {
+                numFragments.set(fragments.size());
+                LOG.error("Checked ledgers returned {} {}", rc, fragments);
+            }
+            checklatch.countDown();
+        });
+        checklatch.await();
+        assertEquals("There should be no missing fragments", 0, numFragments.get());
+
+        return lh2;
+    }
+
+    private void setTargetChannelState(BookKeeper bkc, BookieSocketAddress address,
+                                       long key, boolean state) throws Exception {
+        bkc.getBookieClient().lookupClient(address).obtain((rc, pcbc) -> {
+            pcbc.setWritable(state);
+        }, key);
+    }
+
+    @Test
     public void testManyBookieFailureWithSlowBookies() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
         conf.setReadTimeout(5)
@@ -166,7 +313,6 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
         final LedgerHandle lh = bkc.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, pwd);
         final AtomicBoolean finished = new AtomicBoolean(false);
         final AtomicBoolean failTest = new AtomicBoolean(false);
-        final byte[] entry = "Test Entry".getBytes();
         Thread t = new Thread() {
                 public void run() {
                     try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index d96dce1..7dc6277 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -277,7 +277,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
                     return;
                 }
 
-                client.readEntry(1, 1, cb, null, BookieProtocol.FLAG_DO_FENCING, "00000111112222233333".getBytes());
+                client.readEntry(1, 1, cb, null, BookieProtocol.FLAG_DO_FENCING,
+                        "00000111112222233333".getBytes(), false);
             }
         });
 

-- 
To stop receiving notification emails like this one, please contact
ayegorov@apache.org.