You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2018/06/01 23:28:32 UTC

[bookkeeper] branch master updated: Issue #1409: Added server side backpressure (@bug W-3651831@) (#1410)

This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 319f761  Issue #1409: Added server side backpressure (@bug W-3651831@) (#1410)
319f761 is described below

commit 319f761ca1769d0b35aa70c99b3a1ea8795acaf8
Author: Andrey Yegorov <dl...@users.noreply.github.com>
AuthorDate: Fri Jun 1 16:28:17 2018 -0700

    Issue #1409: Added server side backpressure (@bug W-3651831@) (#1410)
    
    (@bug W-3651831@) backpressure: server-side backpressure
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |   8 +
 .../apache/bookkeeper/bookie/EntryMemTable.java    |  38 +-
 .../bookie/EntryMemTableWithParallelFlusher.java   |   1 +
 .../java/org/apache/bookkeeper/bookie/Journal.java |  23 +-
 .../apache/bookkeeper/bookie/JournalChannel.java   |  18 +-
 .../apache/bookkeeper/bookie/LedgerStorage.java    |   6 +
 .../bookkeeper/bookie/SlowBufferedChannel.java     |  93 ++++
 .../bookkeeper/bookie/SortedLedgerStorage.java     | 125 +++++-
 .../bookkeeper/conf/ClientConfiguration.java       |   1 +
 .../bookkeeper/conf/ServerConfiguration.java       | 152 ++++++-
 .../bookkeeper/processor/RequestProcessor.java     |   1 -
 .../apache/bookkeeper/proto/BookieNettyServer.java |   5 +
 .../bookkeeper/proto/BookieRequestProcessor.java   | 196 +++++++++
 .../org/apache/bookkeeper/proto/BookieServer.java  |  11 +
 .../bookkeeper/proto/GetBookieInfoProcessorV3.java |  10 +-
 .../bookkeeper/proto/PacketProcessorBaseV3.java    |  36 +-
 .../bookkeeper/proto/PerChannelBookieClient.java   |  18 +-
 .../bookkeeper/proto/ReadEntryProcessorV3.java     |   3 +
 .../bookkeeper/proto/WriteEntryProcessorV3.java    |   8 +
 .../org/apache/bookkeeper/util/StringUtils.java    |  18 +
 .../org/apache/bookkeeper/bookie/EntryLogTest.java |   5 +-
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  |   9 +-
 .../bookie/SlowInterleavedLedgerStorage.java       | 140 ++++++
 .../bookie/SlowSortedLedgerStorage.java}           |  31 +-
 .../bookie/SortedLedgerStorageCheckpointTest.java  |   6 +-
 .../bookkeeper/proto/BookieBackpressureTest.java   | 469 +++++++++++++++++++++
 .../proto/ForceLedgerProcessorV3Test.java          |   1 +
 .../proto/TestBookieRequestProcessor.java          |   8 +-
 .../proto/WriteEntryProcessorV3Test.java           |  40 ++
 .../main/resources/bookkeeper/findbugsExclude.xml  |   6 +
 30 files changed, 1394 insertions(+), 92 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index d488bc9..19db7e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -77,6 +77,13 @@ public interface BookKeeperServerStats {
     String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
     String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
 
+    String ADD_ENTRY_IN_PROGRESS = "ADD_ENTRY_IN_PROGRESS";
+    String ADD_ENTRY_BLOCKED = "ADD_ENTRY_BLOCKED";
+    String ADD_ENTRY_BLOCKED_WAIT = "ADD_ENTRY_BLOCKED_WAIT";
+    String READ_ENTRY_IN_PROGRESS = "READ_ENTRY_IN_PROGRESS";
+    String READ_ENTRY_BLOCKED = "READ_ENTRY_BLOCKED";
+    String READ_ENTRY_BLOCKED_WAIT = "READ_ENTRY_BLOCKED_WAIT";
+
     //
     // Journal Stats (scoped under SERVER_SCOPE)
     //
@@ -137,6 +144,7 @@ public interface BookKeeperServerStats {
     String JOURNAL_NUM_FLUSH_MAX_WAIT = "JOURNAL_NUM_FLUSH_MAX_WAIT";
     String SKIP_LIST_FLUSH_BYTES = "SKIP_LIST_FLUSH_BYTES";
     String SKIP_LIST_THROTTLING = "SKIP_LIST_THROTTLING";
+    String SKIP_LIST_THROTTLING_LATENCY = "SKIP_LIST_THROTTLING_LATENCY";
     String READ_LAST_ENTRY_NOENTRY_ERROR = "READ_LAST_ENTRY_NOENTRY_ERROR";
     String LEDGER_CACHE_NUM_EVICTED_LEDGERS = "LEDGER_CACHE_NUM_EVICTED_LEDGERS";
     String PENDING_GET_FILE_INFO = "PENDING_GET_FILE_INFO";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index 70f437c..7328398 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -24,10 +24,12 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_GET_E
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_PUT_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_SNAPSHOT;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING_LATENCY;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -103,6 +105,7 @@ public class EntryMemTable implements AutoCloseable{
     final AtomicLong size;
 
     final long skipListSizeLimit;
+    final Semaphore skipListSemaphore;
 
     SkipListArena allocator;
 
@@ -119,6 +122,7 @@ public class EntryMemTable implements AutoCloseable{
     private final OpStatsLogger getEntryStats;
     final Counter flushBytesCounter;
     private final Counter throttlingCounter;
+    private final OpStatsLogger throttlingStats;
 
     /**
     * Constructor.
@@ -136,12 +140,22 @@ public class EntryMemTable implements AutoCloseable{
         // skip list size limit
         this.skipListSizeLimit = conf.getSkipListSizeLimit();
 
+        if (skipListSizeLimit > (Integer.MAX_VALUE - 1) / 2) {
+            // gives 2*1023MB for mem table.
+            // consider a way to create semaphore with long num of permits
+            // until that 1023MB should be enough for everything (tm)
+            throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2));
+        }
+        // double the size for snapshot in progress + incoming data
+        this.skipListSemaphore = new Semaphore((int) skipListSizeLimit * 2);
+
         // Stats
         this.snapshotStats = statsLogger.getOpStatsLogger(SKIP_LIST_SNAPSHOT);
         this.putEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_PUT_ENTRY);
         this.getEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_GET_ENTRY);
         this.flushBytesCounter = statsLogger.getCounter(SKIP_LIST_FLUSH_BYTES);
         this.throttlingCounter = statsLogger.getCounter(SKIP_LIST_THROTTLING);
+        this.throttlingStats = statsLogger.getOpStatsLogger(SKIP_LIST_THROTTLING_LATENCY);
     }
 
     void dump() {
@@ -264,6 +278,7 @@ public class EntryMemTable implements AutoCloseable{
             }
         }
 
+        skipListSemaphore.release((int) size);
         return size;
     }
 
@@ -286,18 +301,6 @@ public class EntryMemTable implements AutoCloseable{
     }
 
     /**
-     * Throttling writer w/ 1 ms delay.
-     */
-    private void throttleWriters() {
-        try {
-            Thread.sleep(1);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        throttlingCounter.inc();
-    }
-
-    /**
      * Write an update.
      *
      * @param entry
@@ -314,11 +317,18 @@ public class EntryMemTable implements AutoCloseable{
                 Checkpoint cp = snapshot();
                 if ((null != cp) || (!previousFlushSucceeded.get())) {
                     cb.onSizeLimitReached(cp);
-                } else {
-                    throttleWriters();
                 }
             }
 
+            final int len = entry.remaining();
+            if (!skipListSemaphore.tryAcquire(len)) {
+                throttlingCounter.inc();
+                final long throttlingStartTimeNanos = MathUtils.nowInNano();
+                skipListSemaphore.acquireUninterruptibly(len);
+                throttlingStats.registerSuccessfulEvent(MathUtils.elapsedNanos(throttlingStartTimeNanos),
+                        TimeUnit.NANOSECONDS);
+            }
+
             this.lock.readLock().lock();
             try {
                 EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
index a3849e9..4f2cf02 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
@@ -144,6 +144,7 @@ class EntryMemTableWithParallelFlusher extends EntryMemTable {
                 }
             }
         }
+        skipListSemaphore.release(flushedSize.intValue());
         return flushedSize.longValue();
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 71477c0..1da0435 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -36,6 +36,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -77,6 +78,19 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
     }
 
     /**
+     * For testability.
+     */
+    @FunctionalInterface
+    public interface BufferedChannelBuilder {
+        BufferedChannelBuilder DEFAULT_BCBUILDER =
+                (FileChannel fc, int capacity) -> new BufferedChannel(fc, capacity);
+
+        BufferedChannel create(FileChannel fc, int capacity) throws IOException;
+    }
+
+
+
+    /**
      * List all journal ids by a specified journal id filer.
      *
      * @param journalDir journal dir
@@ -935,11 +949,14 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             while (true) {
                 // new journal file to write
                 if (null == logFile) {
+
                     logId = logId + 1;
 
                     journalCreationWatcher.reset().start();
                     logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize,
-                                        journalAlignmentSize, removePagesFromCache, journalFormatVersionToWrite);
+                                        journalAlignmentSize, removePagesFromCache,
+                                        journalFormatVersionToWrite, getBufferedChannelBuilder());
+
                     journalCreationStats.registerSuccessfulEvent(
                             journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
 
@@ -1121,6 +1138,10 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         LOG.info("Journal exited loop!");
     }
 
+    public BufferedChannelBuilder getBufferedChannelBuilder() {
+        return BufferedChannelBuilder.DEFAULT_BCBUILDER;
+    }
+
     /**
      * Shuts down the journal.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index 6d5cae0..507c933 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -93,7 +93,8 @@ class JournalChannel implements Closeable {
     // Open journal for scanning starting from given position.
     JournalChannel(File journalDirectory, long logId,
                    long preAllocSize, int writeBufferSize, long position) throws IOException {
-         this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5);
+         this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE,
+                 position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
     }
 
     // Open journal to write
@@ -101,11 +102,20 @@ class JournalChannel implements Closeable {
                    long preAllocSize, int writeBufferSize, int journalAlignSize,
                    boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException {
         this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
-             START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite);
+             fRemoveFromPageCache, formatVersionToWrite, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
+    }
+
+    JournalChannel(File journalDirectory, long logId,
+                   long preAllocSize, int writeBufferSize, int journalAlignSize,
+                   boolean fRemoveFromPageCache, int formatVersionToWrite,
+                   Journal.BufferedChannelBuilder bcBuilder) throws IOException {
+        this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
+                START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder);
     }
 
     /**
      * Create a journal file.
+     * Allows injection of BufferedChannelBuilder for testing purposes.
      *
      * @param journalDirectory
      *          directory to store the journal file.
@@ -128,7 +138,7 @@ class JournalChannel implements Closeable {
     private JournalChannel(File journalDirectory, long logId,
                            long preAllocSize, int writeBufferSize, int journalAlignSize,
                            long position, boolean fRemoveFromPageCache,
-                           int formatVersionToWrite) throws IOException {
+                           int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder) throws IOException {
         this.journalAlignSize = journalAlignSize;
         this.zeros = ByteBuffer.allocate(journalAlignSize);
         this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize;
@@ -160,7 +170,7 @@ class JournalChannel implements Closeable {
             bb.clear();
             fc.write(bb);
 
-            bc = new BufferedChannel(fc, writeBufferSize);
+            bc = bcBuilder.create(fc, writeBufferSize);
             forceWrite(true);
             nextPrealloc = this.preAllocSize;
             fc.write(zeros, nextPrealloc - journalAlignSize);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index dcfac31..34e32b9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -169,4 +169,10 @@ public interface LedgerStorage {
     void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException;
 
     ByteBuf getExplicitLac(long ledgerId);
+
+    // for testability
+    default LedgerStorage getUnderlyingLedgerStorage() {
+        return this;
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
new file mode 100644
index 0000000..9fdc34c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
@@ -0,0 +1,93 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Strictly for testing.
+ * Have to be alongside with prod code for Journal to inject in tests.
+ */
+public class SlowBufferedChannel extends BufferedChannel {
+    public volatile long getDelay = 0;
+    public volatile long addDelay = 0;
+    public volatile long flushDelay = 0;
+
+    public SlowBufferedChannel(FileChannel fc, int capacity) throws IOException {
+        super(fc, capacity);
+    }
+
+    public SlowBufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
+        super(fc, writeCapacity, readCapacity);
+    }
+
+    public void setAddDelay(long delay) {
+        addDelay = delay;
+    }
+
+    public void setGetDelay(long delay) {
+        getDelay = delay;
+    }
+
+    public void setFlushDelay(long delay) {
+        flushDelay = delay;
+    }
+
+    @Override
+    public synchronized void write(ByteBuf src) throws IOException {
+        delayMs(addDelay);
+        super.write(src);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        delayMs(flushDelay);
+        super.flush();
+    }
+
+    @Override
+    public long forceWrite(boolean forceMetadata) throws IOException {
+        delayMs(flushDelay);
+        return super.forceWrite(forceMetadata);
+    }
+
+    @Override
+    public synchronized int read(ByteBuf dest, long pos) throws IOException {
+        delayMs(getDelay);
+        return super.read(dest, pos);
+    }
+
+    private static void delayMs(long delay) {
+        if (delay < 1) {
+            return;
+        }
+        try {
+            TimeUnit.MILLISECONDS.sleep(delay);
+        } catch (InterruptedException e) {
+            //noop
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 815c65e..5c4f75a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -41,16 +42,23 @@ import org.slf4j.LoggerFactory;
  * entries will be first added into a {@code MemTable}, and then be flushed back to the
  * {@code InterleavedLedgerStorage} when the {@code MemTable} becomes full.
  */
-public class SortedLedgerStorage extends InterleavedLedgerStorage
-        implements LedgerStorage, CacheCallback, SkipListFlusher {
+public class SortedLedgerStorage
+        implements LedgerStorage, CacheCallback, SkipListFlusher,
+            CompactableLedgerStorage, EntryLogger.EntryLogListener {
     private static final Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class);
 
     EntryMemTable memTable;
     private ScheduledExecutorService scheduler;
     private StateManager stateManager;
+    private final InterleavedLedgerStorage interleavedLedgerStorage;
 
     public SortedLedgerStorage() {
-        super();
+        this(new InterleavedLedgerStorage());
+    }
+
+    @VisibleForTesting
+    protected SortedLedgerStorage(InterleavedLedgerStorage ils) {
+        interleavedLedgerStorage = ils;
     }
 
     @Override
@@ -63,7 +71,8 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
                            Checkpointer checkpointer,
                            StatsLogger statsLogger)
             throws IOException {
-        super.initialize(
+
+        interleavedLedgerStorage.initialize(
             conf,
             ledgerManager,
             ledgerDirsManager,
@@ -72,6 +81,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
             checkpointSource,
             checkpointer,
             statsLogger);
+
         if (conf.isEntryLogPerLedgerEnabled()) {
             this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger);
         } else {
@@ -96,7 +106,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
         } catch (IOException e) {
             LOG.error("Exception thrown while flushing ledger cache.", e);
         }
-        super.start();
+        interleavedLedgerStorage.start();
     }
 
     @Override
@@ -111,30 +121,50 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
         } catch (Exception e) {
             LOG.error("Error while closing the memtable", e);
         }
-        super.shutdown();
+        interleavedLedgerStorage.shutdown();
     }
 
     @Override
     public boolean ledgerExists(long ledgerId) throws IOException {
         // Done this way because checking the skip list is an O(logN) operation compared to
         // the O(1) for the ledgerCache.
-        if (!super.ledgerExists(ledgerId)) {
+        if (!interleavedLedgerStorage.ledgerExists(ledgerId)) {
             EntryKeyValue kv = memTable.getLastEntry(ledgerId);
             if (null == kv) {
-                return super.ledgerExists(ledgerId);
+                return interleavedLedgerStorage.ledgerExists(ledgerId);
             }
         }
         return true;
     }
 
     @Override
+    public boolean setFenced(long ledgerId) throws IOException {
+        return interleavedLedgerStorage.setFenced(ledgerId);
+    }
+
+    @Override
+    public boolean isFenced(long ledgerId) throws IOException {
+        return interleavedLedgerStorage.isFenced(ledgerId);
+    }
+
+    @Override
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        interleavedLedgerStorage.setMasterKey(ledgerId, masterKey);
+    }
+
+    @Override
+    public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+        return interleavedLedgerStorage.readMasterKey(ledgerId);
+    }
+
+    @Override
     public long addEntry(ByteBuf entry) throws IOException {
         long ledgerId = entry.getLong(entry.readerIndex() + 0);
         long entryId = entry.getLong(entry.readerIndex() + 8);
         long lac = entry.getLong(entry.readerIndex() + 16);
 
         memTable.addEntry(ledgerId, entryId, entry.nioBuffer(), this);
-        ledgerCache.updateLastAddConfirmed(ledgerId, lac);
+        interleavedLedgerStorage.ledgerCache.updateLastAddConfirmed(ledgerId, lac);
         return entryId;
     }
 
@@ -149,7 +179,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
             return kv.getValueAsByteBuffer();
         }
         // If it doesn't exist in the skip list, then fallback to the ledger cache+index.
-        return super.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+        return interleavedLedgerStorage.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
     }
 
     @Override
@@ -159,13 +189,13 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
         }
         ByteBuf buffToRet;
         try {
-            buffToRet = super.getEntry(ledgerId, entryId);
+            buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId);
         } catch (Bookie.NoEntryException nee) {
             EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
             if (null == kv) {
                 // The entry might have been flushed since we last checked, so query the ledger cache again.
                 // If the entry truly doesn't exist, then this will throw a NoEntryException
-                buffToRet = super.getEntry(ledgerId, entryId);
+                buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId);
             } else {
                 buffToRet = kv.getValueAsByteBuffer();
             }
@@ -175,22 +205,55 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
     }
 
     @Override
+    public long getLastAddConfirmed(long ledgerId) throws IOException {
+        return interleavedLedgerStorage.getLastAddConfirmed(ledgerId);
+    }
+
+    @Override
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                 long previousLAC,
+                                                 Watcher<LastAddConfirmedUpdateNotification> watcher)
+            throws IOException {
+        return interleavedLedgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
+    }
+
+    @Override
     public void checkpoint(final Checkpoint checkpoint) throws IOException {
         long numBytesFlushed = memTable.flush(this, checkpoint);
-        entryLogger.prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
-        super.checkpoint(checkpoint);
+        interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
+        interleavedLedgerStorage.checkpoint(checkpoint);
+    }
+
+    @Override
+    public void deleteLedger(long ledgerId) throws IOException {
+        interleavedLedgerStorage.deleteLedger(ledgerId);
+    }
+
+    @Override
+    public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
+        interleavedLedgerStorage.registerLedgerDeletionListener(listener);
+    }
+
+    @Override
+    public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
+        interleavedLedgerStorage.setExplicitlac(ledgerId, lac);
+    }
+
+    @Override
+    public ByteBuf getExplicitLac(long ledgerId) {
+        return interleavedLedgerStorage.getExplicitLac(ledgerId);
     }
 
     @Override
     public void process(long ledgerId, long entryId,
                         ByteBuf buffer) throws IOException {
-        processEntry(ledgerId, entryId, buffer, false);
+        interleavedLedgerStorage.processEntry(ledgerId, entryId, buffer, false);
     }
 
     @Override
     public void flush() throws IOException {
         memTable.flush(this, Checkpoint.MAX);
-        super.flush();
+        interleavedLedgerStorage.flush();
     }
 
     // CacheCallback functions.
@@ -212,10 +275,10 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
             public void run() {
                 try {
                     LOG.info("Started flushing mem table.");
-                    entryLogger.prepareEntryMemTableFlush();
+                    interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush();
                     memTable.flush(SortedLedgerStorage.this);
-                    if (entryLogger.commitEntryMemTableFlush()) {
-                        checkpointer.startCheckpoint(cp);
+                    if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) {
+                        interleavedLedgerStorage.checkpointer.startCheckpoint(cp);
                     }
                 } catch (Exception e) {
                     stateManager.transitionToReadOnlyMode();
@@ -237,4 +300,28 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
         return (BookieStateManager) stateManager;
     }
 
+    @Override
+    public EntryLogger getEntryLogger() {
+        return interleavedLedgerStorage.getEntryLogger();
+    }
+
+    @Override
+    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException {
+        return interleavedLedgerStorage.getActiveLedgersInRange(firstLedgerId, lastLedgerId);
+    }
+
+    @Override
+    public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
+        interleavedLedgerStorage.updateEntriesLocations(locations);
+    }
+
+    @Override
+    public void flushEntriesLocationsIndex() throws IOException {
+        interleavedLedgerStorage.flushEntriesLocationsIndex();
+    }
+
+    @Override
+    public LedgerStorage getUnderlyingLedgerStorage() {
+        return interleavedLedgerStorage;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 1630d1a..9f09c37 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -125,6 +125,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     protected static final String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";
     protected static final String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs";
     protected static final String TIMEOUT_TIMER_NUM_TICKS = "timeoutTimerNumTicks";
+    // backpressure configuration
     protected static final String WAIT_TIMEOUT_ON_BACKPRESSURE = "waitTimeoutOnBackpressureMs";
 
     // Bookie health check settings
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index d51e93d..faf671d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -81,6 +81,12 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     protected static final String JOURNAL_ALIGNMENT_SIZE = "journalAlignmentSize";
     protected static final String NUM_JOURNAL_CALLBACK_THREADS = "numJournalCallbackThreads";
     protected static final String JOURNAL_FORMAT_VERSION_TO_WRITE = "journalFormatVersionToWrite";
+    // backpressure control
+    protected static final String MAX_ADDS_IN_PROGRESS_LIMIT = "maxAddsInProgressLimit";
+    protected static final String MAX_READS_IN_PROGRESS_LIMIT = "maxReadsInProgressLimit";
+    protected static final String CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT = "closeChannelOnResponseTimeout";
+    protected static final String WAIT_TIMEOUT_ON_RESPONSE_BACKPRESSURE = "waitTimeoutOnResponseBackpressureMs";
+
     // Bookie Parameters
     protected static final String BOOKIE_PORT = "bookiePort";
     protected static final String LISTENING_INTERFACE = "listeningInterface";
@@ -93,10 +99,12 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     protected static final String LEDGER_DIRS = "ledgerDirectories";
     protected static final String INDEX_DIRS = "indexDirectories";
     protected static final String ALLOW_STORAGE_EXPANSION = "allowStorageExpansion";
-    // NIO Parameters
+    // NIO and Netty Parameters
     protected static final String SERVER_TCP_NODELAY = "serverTcpNoDelay";
     protected static final String SERVER_SOCK_KEEPALIVE = "serverSockKeepalive";
     protected static final String SERVER_SOCK_LINGER = "serverTcpLinger";
+    protected static final String SERVER_WRITEBUFFER_LOW_WATER_MARK = "serverWriteBufferLowWaterMark";
+    protected static final String SERVER_WRITEBUFFER_HIGH_WATER_MARK = "serverWriteBufferHighWaterMark";
 
     // Zookeeper Parameters
     protected static final String ZK_RETRY_BACKOFF_START_MS = "zkRetryBackoffStartMs";
@@ -635,6 +643,102 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     }
 
     /**
+     * Get max number of adds in progress. 0 == unlimited.
+     *
+     * @return Max number of adds in progress.
+     */
+    public int getMaxAddsInProgressLimit() {
+        return this.getInt(MAX_ADDS_IN_PROGRESS_LIMIT, 0);
+    }
+
+    /**
+     * Set max number of adds in progress. 0 == unlimited.
+     *
+     * @param value
+     *          max number of adds in progress.
+     * @return server configuration.
+     */
+    public ServerConfiguration setMaxAddsInProgressLimit(int value) {
+        this.setProperty(MAX_ADDS_IN_PROGRESS_LIMIT, value);
+        return this;
+    }
+
+    /**
+     * Get max number of reads in progress. 0 == unlimited.
+     *
+     * @return Max number of reads in progress.
+     */
+    public int getMaxReadsInProgressLimit() {
+        return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 0);
+    }
+
+    /**
+     * Set max number of reads in progress. 0 == unlimited.
+     *
+     * @param value
+     *          max number of reads in progress.
+     * @return server configuration.
+     */
+    public ServerConfiguration setMaxReadsInProgressLimit(int value) {
+        this.setProperty(MAX_READS_IN_PROGRESS_LIMIT, value);
+        return this;
+    }
+
+    /**
+     * Configures action in case if server timed out sending response to the client.
+     * true == close the channel and drop response
+     * false == drop response
+     * Requires waitTimeoutOnBackpressureMs >= 0 otherwise ignored.
+     *
+     * @return value indicating if channel should be closed.
+     */
+    public boolean getCloseChannelOnResponseTimeout(){
+        return this.getBoolean(CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT, false);
+    }
+
+    /**
+     * Configures action in case if server timed out sending response to the client.
+     * true == close the channel and drop response
+     * false == drop response
+     * Requires waitTimeoutOnBackpressureMs >= 0 otherwise ignored.
+     *
+     * @param value
+     * @return server configuration.
+     */
+    public ServerConfiguration setCloseChannelOnResponseTimeout(boolean value) {
+        this.setProperty(CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT, value);
+        return this;
+    }
+
+    /**
+     * Timeout controlling wait on response send in case of unresponsive client
+     * (i.e. client in long GC etc.)
+     *
+     * @return timeout value
+     *        negative value disables the feature
+     *        0 to allow request to fail immediately
+     *        Default is -1 (disabled)
+     */
+    public long getWaitTimeoutOnResponseBackpressureMillis() {
+        return getLong(WAIT_TIMEOUT_ON_RESPONSE_BACKPRESSURE, -1);
+    }
+
+    /**
+     * Timeout controlling wait on response send in case of unresponsive client
+     * (i.e. client in long GC etc.)
+     *
+     * @param value
+     *        negative value disables the feature
+     *        0 to allow request to fail immediately
+     *        Default is -1 (disabled)
+     * @return client configuration.
+     */
+    public ServerConfiguration setWaitTimeoutOnResponseBackpressureMillis(long value) {
+        setProperty(WAIT_TIMEOUT_ON_RESPONSE_BACKPRESSURE, value);
+        return this;
+    }
+
+    /**
      * Get bookie port that bookie server listen on.
      *
      * @return bookie port
@@ -1540,6 +1644,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
 
     /**
      * Get skip list data size limitation (default 64MB).
+     * Max value is 1,073,741,823
      *
      * @return skip list data size limitation
      */
@@ -1554,6 +1659,10 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
      * @return server configuration object.
      */
     public ServerConfiguration setSkipListSizeLimit(int size) {
+        if (size > (Integer.MAX_VALUE - 1) / 2) {
+            // gives max of 2*1023MB for mem table (one being checkpointed and still writable).
+            throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2));
+        }
         setProperty(SKIP_LIST_SIZE_LIMIT, size);
         return this;
     }
@@ -2750,6 +2859,47 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     }
 
     /**
+     * Get server netty channel write buffer low water mark.
+     *
+     * @return netty channel write buffer low water mark.
+     */
+    public int getServerWriteBufferLowWaterMark() {
+        return getInt(SERVER_WRITEBUFFER_LOW_WATER_MARK, 384 * 1024);
+    }
+
+    /**
+     * Set server netty channel write buffer low water mark.
+     *
+     * @param waterMark
+     *          netty channel write buffer low water mark.
+     * @return client configuration.
+     */
+    public ServerConfiguration setServerWriteBufferLowWaterMark(int waterMark) {
+        setProperty(SERVER_WRITEBUFFER_LOW_WATER_MARK, waterMark);
+        return this;
+    }
+
+    /**
+     * Get server netty channel write buffer high water mark.
+     *
+     * @return netty channel write buffer high water mark.
+     */
+    public int getServerWriteBufferHighWaterMark() {
+        return getInt(SERVER_WRITEBUFFER_HIGH_WATER_MARK, 512 * 1024);
+    }
+
+    /**
+     * Set server netty channel write buffer high water mark.
+     *
+     * @param waterMark
+     *          netty channel write buffer high water mark.
+     * @return client configuration.
+     */
+    public ServerConfiguration setServerWriteBufferHighWaterMark(int waterMark) {
+        setProperty(SERVER_WRITEBUFFER_HIGH_WATER_MARK, waterMark);
+        return this;
+    }
+    /**
      * Set registration manager class.
      *
      * @param regManagerClass
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
index 2c8cf7a..8b328ef 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
@@ -41,5 +41,4 @@ public interface RequestProcessor extends AutoCloseable {
      *          channel received the given request <i>r</i>
      */
     void processRequest(Object r, Channel channel);
-
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index bc303b9..d687a5c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -35,6 +35,7 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.DefaultEventLoopGroup;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.group.ChannelGroup;
@@ -284,6 +285,8 @@ class BookieNettyServer {
             bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
                     new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
                             conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
+            bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
+                    conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));
 
             if (eventLoopGroup instanceof EpollEventLoopGroup) {
                 bootstrap.channel(EpollServerSocketChannel.class);
@@ -343,6 +346,8 @@ class BookieNettyServer {
             jvmBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
                     new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
                             conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
+            jvmBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
+                    conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));
 
             if (jvmEventLoopGroup instanceof DefaultEventLoopGroup) {
                 jvmBootstrap.channel(LocalServerChannel.class);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 2aebbb9..becfe68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -22,6 +22,9 @@ package org.apache.bookkeeper.proto;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
@@ -29,9 +32,12 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_RE
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
@@ -45,6 +51,9 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
 import static org.apache.bookkeeper.proto.RequestUtils.hasFlag;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
 
@@ -54,19 +63,25 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
@@ -88,6 +103,7 @@ public class BookieRequestProcessor implements RequestProcessor {
      * worker threads.
      */
     private final ServerConfiguration serverCfg;
+    private final long waitTimeoutOnBackpressureMillis;
 
     /**
      * This is the Bookie instance that is used to handle all read and write requests.
@@ -150,10 +166,27 @@ public class BookieRequestProcessor implements RequestProcessor {
     final OpStatsLogger getBookieInfoRequestStats;
     final OpStatsLogger getBookieInfoStats;
     final OpStatsLogger channelWriteStats;
+    final OpStatsLogger addEntryBlockedStats;
+    final OpStatsLogger readEntryBlockedStats;
+
+    final AtomicInteger addsInProgress = new AtomicInteger(0);
+    final AtomicInteger maxAddsInProgress = new AtomicInteger(0);
+    final AtomicInteger addsBlocked = new AtomicInteger(0);
+    final AtomicInteger readsInProgress = new AtomicInteger(0);
+    final AtomicInteger readsBlocked = new AtomicInteger(0);
+    final AtomicInteger maxReadsInProgress = new AtomicInteger(0);
+
+    final Semaphore addsSemaphore;
+    final Semaphore readsSemaphore;
+
+    // to temporary blacklist channels
+    final Optional<Cache<Channel, Boolean>> blacklistedChannels;
+    final Consumer<Channel> onResponseTimeout;
 
     public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
             StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException {
         this.serverCfg = serverCfg;
+        this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
         this.bookie = bookie;
         this.readThreadPool = createExecutor(
                 this.serverCfg.getNumReadWorkerThreads(),
@@ -190,6 +223,25 @@ public class BookieRequestProcessor implements RequestProcessor {
             shFactory.init(NodeType.Server, serverCfg);
         }
 
+        if (waitTimeoutOnBackpressureMillis > 0) {
+            blacklistedChannels = Optional.of(CacheBuilder.newBuilder()
+                    .expireAfterWrite(waitTimeoutOnBackpressureMillis, TimeUnit.MILLISECONDS)
+                    .build());
+        } else {
+            blacklistedChannels = Optional.empty();
+        }
+
+        if (serverCfg.getCloseChannelOnResponseTimeout()) {
+            onResponseTimeout = (ch) -> {
+                LOG.warn("closing channel {} because it was non-writable for longer than {} ms",
+                        ch, waitTimeoutOnBackpressureMillis);
+                ch.close();
+            };
+        } else {
+            // noop
+            onResponseTimeout = (ch) -> {};
+        }
+
         // Expose Stats
         this.statsEnabled = serverCfg.isStatisticsEnabled();
         this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
@@ -214,6 +266,126 @@ public class BookieRequestProcessor implements RequestProcessor {
         this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO);
         this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST);
         this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE);
+
+        this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT);
+        this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT);
+
+        int maxAdds = serverCfg.getMaxAddsInProgressLimit();
+        addsSemaphore = maxAdds > 0 ? new Semaphore(maxAdds, true) : null;
+
+        int maxReads = serverCfg.getMaxReadsInProgressLimit();
+        readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null;
+
+        statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return addsInProgress;
+            }
+        });
+
+        statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return addsBlocked;
+            }
+        });
+
+        statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return readsInProgress;
+            }
+        });
+
+        statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return readsBlocked;
+            }
+        });
+
+    }
+
+    protected void onAddRequestStart(Channel channel) {
+        if (addsSemaphore != null) {
+            if (!addsSemaphore.tryAcquire()) {
+                final long throttlingStartTimeNanos = MathUtils.nowInNano();
+                channel.config().setAutoRead(false);
+                LOG.info("Too many add requests in progress, disabling autoread on channel {}", channel);
+                addsBlocked.incrementAndGet();
+                addsSemaphore.acquireUninterruptibly();
+                channel.config().setAutoRead(true);
+                final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
+                LOG.info("Re-enabled autoread on channel {} after AddRequest delay of {} nanos", channel, delayNanos);
+                addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
+                addsBlocked.decrementAndGet();
+            }
+        }
+        final int curr = addsInProgress.incrementAndGet();
+        maxAddsInProgress.accumulateAndGet(curr, Integer::max);
+    }
+
+    protected void onAddRequestFinish() {
+        addsInProgress.decrementAndGet();
+        if (addsSemaphore != null) {
+            addsSemaphore.release();
+        }
+    }
+
+    protected void onReadRequestStart(Channel channel) {
+        if (readsSemaphore != null) {
+            if (!readsSemaphore.tryAcquire()) {
+                final long throttlingStartTimeNanos = MathUtils.nowInNano();
+                channel.config().setAutoRead(false);
+                LOG.info("Too many read requests in progress, disabling autoread on channel {}", channel);
+                readsBlocked.incrementAndGet();
+                readsSemaphore.acquireUninterruptibly();
+                channel.config().setAutoRead(true);
+                final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
+                LOG.info("Re-enabled autoread on channel {} after ReadRequest delay of {} nanos", channel, delayNanos);
+                readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
+                readsBlocked.decrementAndGet();
+            }
+        }
+        final int curr = readsInProgress.incrementAndGet();
+        maxReadsInProgress.accumulateAndGet(curr, Integer::max);
+    }
+
+    protected void onReadRequestFinish() {
+        readsInProgress.decrementAndGet();
+        if (readsSemaphore != null) {
+            readsSemaphore.release();
+        }
+    }
+
+    @VisibleForTesting
+    int maxAddsInProgressCount() {
+        return maxAddsInProgress.get();
+    }
+
+    @VisibleForTesting
+    int maxReadsInProgressCount() {
+        return maxReadsInProgress.get();
     }
 
     @Override
@@ -572,4 +744,28 @@ public class BookieRequestProcessor implements RequestProcessor {
             }
         }
     }
+
+    public long getWaitTimeoutOnBackpressureMillis() {
+        return waitTimeoutOnBackpressureMillis;
+    }
+
+    public void blacklistChannel(Channel channel) {
+        blacklistedChannels
+                .ifPresent(x -> x.put(channel, true));
+    }
+
+    public void invalidateBlacklist(Channel channel) {
+        blacklistedChannels
+                .ifPresent(x -> x.invalidate(channel));
+    }
+
+    public boolean isBlacklisted(Channel channel) {
+        return blacklistedChannels
+                .map(x -> x.getIfPresent(channel))
+                .orElse(false);
+    }
+
+    public void handleNonWritableChannel(Channel channel) {
+        onResponseTimeout.accept(channel);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index c0214d3..d386b91 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -29,6 +29,8 @@ import java.io.IOException;
 import java.net.UnknownHostException;
 import java.security.AccessControlException;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
 import org.apache.bookkeeper.bookie.BookieException;
@@ -127,6 +129,10 @@ public class BookieServer {
         running = true;
         deathWatcher = new DeathWatcher(conf);
         deathWatcher.start();
+
+        // fixes test flappers at random places until ISSUE#1400 is resolved
+        // https://github.com/apache/bookkeeper/issues/1400
+        TimeUnit.MILLISECONDS.sleep(250);
     }
 
     @VisibleForTesting
@@ -139,6 +145,11 @@ public class BookieServer {
         return bookie;
     }
 
+    @VisibleForTesting
+    public BookieRequestProcessor getBookieRequestProcessor() {
+        return (BookieRequestProcessor) requestProcessor;
+    }
+
     /**
      * Suspend processing of requests in the bookie (for testing).
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index fc55e4a..d964957 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -54,7 +54,7 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
 
         if (!isVersionCompatible()) {
             getBookieInfoResponse.setStatus(StatusCode.EBADVERSION);
-            requestProcessor.getBookieInfoStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+            requestProcessor.getGetBookieInfoStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
             return getBookieInfoResponse.build();
         }
@@ -66,11 +66,11 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
         long freeDiskSpace = 0L, totalDiskSpace = 0L;
         try {
             if ((requested & GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
-                freeDiskSpace = requestProcessor.bookie.getTotalFreeSpace();
+                freeDiskSpace = requestProcessor.getBookie().getTotalFreeSpace();
                 getBookieInfoResponse.setFreeDiskSpace(freeDiskSpace);
             }
             if ((requested & GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) {
-                totalDiskSpace = requestProcessor.bookie.getTotalDiskSpace();
+                totalDiskSpace = requestProcessor.getBookie().getTotalDiskSpace();
                 getBookieInfoResponse.setTotalDiskCapacity(totalDiskSpace);
             }
             LOG.debug("FreeDiskSpace info is " + freeDiskSpace + " totalDiskSpace is: " + totalDiskSpace);
@@ -80,7 +80,7 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
         }
 
         getBookieInfoResponse.setStatus(status);
-        requestProcessor.getBookieInfoStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+        requestProcessor.getGetBookieInfoStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                 TimeUnit.NANOSECONDS);
         return getBookieInfoResponse.build();
     }
@@ -98,6 +98,6 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
                 .setGetBookieInfoResponse(getBookieInfoResponse);
         sendResponse(response.getStatus(),
                      response.build(),
-                     requestProcessor.getBookieInfoRequestStats);
+                     requestProcessor.getGetBookieInfoRequestStats());
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 9cd9fd5..7dc29a3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.util.StringUtils;
 
 /**
  * A base class for bookkeeper protocol v3 packet processors.
@@ -53,6 +54,40 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
 
     protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
         final long writeNanos = MathUtils.nowInNano();
+
+        final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis();
+        if (timeOut >= 0 && !channel.isWritable()) {
+            if (!requestProcessor.isBlacklisted(channel)) {
+                synchronized (channel) {
+                    if (!channel.isWritable() && !requestProcessor.isBlacklisted(channel)) {
+                        final long waitUntilNanos = writeNanos + TimeUnit.MILLISECONDS.toNanos(timeOut);
+                        while (!channel.isWritable() && MathUtils.nowInNano() < waitUntilNanos) {
+                            try {
+                                TimeUnit.MILLISECONDS.sleep(1);
+                            } catch (InterruptedException e) {
+                                break;
+                            }
+                        }
+                        if (!channel.isWritable()) {
+                            requestProcessor.blacklistChannel(channel);
+                            requestProcessor.handleNonWritableChannel(channel);
+                        }
+                    }
+                }
+            }
+
+            if (!channel.isWritable()) {
+                LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel,
+                        StringUtils.requestToString(request));
+                requestProcessor.getChannelWriteStats()
+                        .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS);
+                statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+                return;
+            } else {
+                requestProcessor.invalidateBlacklist(channel);
+            }
+        }
+
         channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
@@ -71,7 +106,6 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
                 }
             }
         });
-
     }
 
     protected boolean isVersionCompatible() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 4d69acf..39436d4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -126,6 +126,7 @@ import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -939,7 +940,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                            final Object request,
                            final boolean allowFastFail) {
         if (channel == null) {
-            LOG.warn("Operation {} failed: channel == null", requestToString(request));
+            LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
             errorOut(key);
             return;
         }
@@ -952,7 +953,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
         if (allowFastFail && !isWritable) {
             LOG.warn("Operation {} failed: TooManyRequestsException",
-                    requestToString(request));
+                    StringUtils.requestToString(request));
 
             errorOut(key, BKException.Code.TooManyRequestsException);
             return;
@@ -975,22 +976,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
             channel.writeAndFlush(request, promise);
         } catch (Throwable e) {
-            LOG.warn("Operation {} failed", requestToString(request), e);
+            LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
-    private static String requestToString(Object request) {
-        if (request instanceof BookkeeperProtocol.Request) {
-            BookkeeperProtocol.BKPacketHeader header = ((BookkeeperProtocol.Request) request).getHeader();
-            return String.format("Req(txnId=%d,op=%s,version=%s)",
-                                 header.getTxnId(), header.getOperation(),
-                                 header.getVersion());
-        } else {
-            return request.toString();
-        }
-    }
-
     void errorOut(final CompletionKey key) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Removing completion key: {}", key);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 3d2a1ea..12c0201 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -65,6 +65,8 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
                                 BookieRequestProcessor requestProcessor,
                                 ExecutorService fenceThreadPool) {
         super(request, channel, requestProcessor);
+        requestProcessor.onReadRequestStart(channel);
+
         this.readRequest = request.getReadRequest();
         this.ledgerId = readRequest.getLedgerId();
         this.entryId = readRequest.getEntryId();
@@ -313,6 +315,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
         sendResponse(response.getStatus(),
                      response.build(),
                      reqStats);
+        requestProcessor.onReadRequestFinish();
     }
 
     //
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index fe68da4..7747e5c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
     public WriteEntryProcessorV3(Request request, Channel channel,
                                  BookieRequestProcessor requestProcessor) {
         super(request, channel, requestProcessor);
+        requestProcessor.onAddRequestStart(channel);
     }
 
     // Returns null if there is no exception thrown
@@ -173,6 +175,12 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
         }
     }
 
+    @Override
+    protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
+        super.sendResponse(code, response, statsLogger);
+        requestProcessor.onAddRequestFinish();
+    }
+
     /**
      * this toString method filters out body and masterKey from the output.
      * masterKey contains the password of the ledger and body is customer data,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
index f55edd1..73bf018 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.util;
 
 import java.io.IOException;
 
+import org.apache.bookkeeper.proto.BookkeeperProtocol;
+
 /**
  * Provided utilites for parsing network addresses, ledger-id from node paths
  * etc.
@@ -161,4 +163,20 @@ public class StringUtils {
         }
     }
 
+    /**
+     * Builds string representation of teh request without extra (i.e. binary) data
+     *
+     * @param request
+     * @return string representation of request
+     */
+    public static String requestToString(Object request) {
+        if (request instanceof BookkeeperProtocol.Request) {
+            BookkeeperProtocol.BKPacketHeader header = ((BookkeeperProtocol.Request) request).getHeader();
+            return String.format("Req(txnId=%d,op=%s,version=%s)",
+                    header.getTxnId(), header.getOperation(),
+                    header.getVersion());
+        } else {
+            return request.toString();
+        }
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index f5e73a1..fbfe7c9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -303,7 +303,8 @@ public class EntryLogTest {
         Bookie bookie = new Bookie(conf);
         EntryLogger entryLogger = new EntryLogger(conf,
                 bookie.getLedgerDirsManager());
-        InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+        InterleavedLedgerStorage ledgerStorage =
+                ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage());
         ledgerStorage.entryLogger = entryLogger;
         // Create ledgers
         ledgerStorage.setMasterKey(1, "key".getBytes());
@@ -675,7 +676,7 @@ public class EntryLogTest {
         conf.setLedgerStorageClass(ledgerStorageClass);
         conf.setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled);
         Bookie bookie = new Bookie(conf);
-        InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+        CompactableLedgerStorage ledgerStorage = (CompactableLedgerStorage) bookie.ledgerStorage;
         Random rand = new Random(0);
 
         if (ledgerStorageClass.equals(SortedLedgerStorage.class.getName())) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index ae3b4cd..1a01299 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -89,7 +89,7 @@ public class LedgerCacheTest {
         bookie = new Bookie(conf);
 
         activeLedgers = new SnapshotMap<Long, Boolean>();
-        ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache;
+        ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage()).ledgerCache;
     }
 
     @After
@@ -116,8 +116,8 @@ public class LedgerCacheTest {
         if (ledgerCache != null) {
             ledgerCache.close();
         }
-        ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache = new LedgerCacheImpl(
-                conf, activeLedgers, bookie.getIndexDirsManager());
+        ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage())
+                .ledgerCache = new LedgerCacheImpl(conf, activeLedgers, bookie.getIndexDirsManager());
         flushThread = new Thread() {
                 public void run() {
                     while (true) {
@@ -275,7 +275,8 @@ public class LedgerCacheTest {
         conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() });
 
         Bookie bookie = new Bookie(conf);
-        InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+        InterleavedLedgerStorage ledgerStorage =
+                ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage());
         LedgerCacheImpl ledgerCache = (LedgerCacheImpl) ledgerStorage.ledgerCache;
         // Create ledger index file
         ledgerStorage.setMasterKey(1, "key".getBytes());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
new file mode 100644
index 0000000..c3a8f63
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
@@ -0,0 +1,140 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Strictly for testing.
+ * have to be in org.apache.bookkeeper.bookie to not introduce changes to InterleavedLedgerStorage
+ */
+public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
+
+    public static final String PROP_SLOW_STORAGE_FLUSH_DELAY = "test.slowStorage.flushDelay";
+    public static final String PROP_SLOW_STORAGE_ADD_DELAY = "test.slowStorage.addDelay";
+    public static final String PROP_SLOW_STORAGE_GET_DELAY = "test.slowStorage.getDelay";
+
+    /**
+     * Strictly for testing.
+     */
+    public static class SlowEntryLogger extends EntryLogger {
+        public volatile long getDelay = 0;
+        public volatile long addDelay = 0;
+        public volatile long flushDelay = 0;
+
+        public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener)
+                throws IOException {
+            super(conf, ledgerDirsManager, listener);
+        }
+
+        public SlowEntryLogger setAddDelay(long delay) {
+            addDelay = delay;
+            return this;
+        }
+
+        public SlowEntryLogger setGetDelay(long delay) {
+            getDelay = delay;
+            return this;
+        }
+
+        public SlowEntryLogger setFlushDelay(long delay) {
+            flushDelay = delay;
+            return this;
+        }
+
+        @Override
+        public void flush() throws IOException {
+            delayMs(flushDelay);
+            super.flush();
+        }
+
+        @Override
+        public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+            delayMs(addDelay);
+            return super.addEntry(ledger, entry, rollLog);
+        }
+
+        @Override
+        public ByteBuf readEntry(long ledgerId, long entryId, long location)
+                throws IOException, Bookie.NoEntryException {
+            delayMs(getDelay);
+            return super.readEntry(ledgerId, entryId, location);
+        }
+
+        private static void delayMs(long delay) {
+            if (delay < 1) {
+                return;
+            }
+            try {
+                Thread.sleep(delay);
+            } catch (InterruptedException e) {
+                //noop
+            }
+        }
+
+    }
+
+    public SlowInterleavedLedgerStorage() {
+        super();
+    }
+
+    @Override
+    public void initialize(ServerConfiguration conf,
+                           LedgerManager ledgerManager,
+                           LedgerDirsManager ledgerDirsManager,
+                           LedgerDirsManager indexDirsManager,
+                           StateManager stateManager,
+                           CheckpointSource checkpointSource,
+                           Checkpointer checkpointer,
+                           StatsLogger statsLogger)
+            throws IOException {
+        super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
+                stateManager, checkpointSource, checkpointer, statsLogger);
+        // do not want to add these to config class, reading throw "raw" interface
+        long getDelay = conf.getLong(PROP_SLOW_STORAGE_GET_DELAY, 0);
+        long addDelay = conf.getLong(PROP_SLOW_STORAGE_ADD_DELAY, 0);
+        long flushDelay = conf.getLong(PROP_SLOW_STORAGE_FLUSH_DELAY, 0);
+
+        entryLogger = new SlowEntryLogger(conf, ledgerDirsManager, this)
+                .setAddDelay(addDelay)
+                .setGetDelay(getDelay)
+                .setFlushDelay(flushDelay);
+    }
+
+    public void setAddDelay(long delay) {
+        ((SlowEntryLogger) entryLogger).setAddDelay(delay);
+    }
+
+    public void setGetDelay(long delay) {
+        ((SlowEntryLogger) entryLogger).setGetDelay(delay);
+    }
+
+    public void setFlushDelay(long delay) {
+        ((SlowEntryLogger) entryLogger).setFlushDelay(delay);
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java
similarity index 61%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
copy to bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java
index 2c8cf7a..e12976d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java
@@ -1,4 +1,6 @@
-/**
+package org.apache.bookkeeper.bookie;
+
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,28 +20,17 @@
  * under the License.
  *
  */
-package org.apache.bookkeeper.processor;
-
-import io.netty.channel.Channel;
 
 /**
- * A request processor that is used for processing requests at bookie side.
+ * Strictly for unit testing.
  */
-public interface RequestProcessor extends AutoCloseable {
-
-    /**
-     * Close the request processor.
-     */
-    void close();
+public class SlowSortedLedgerStorage extends SortedLedgerStorage {
 
-    /**
-     * Process request.
-     *
-     * @param r
-     *          request to process
-     * @param channel
-     *          channel received the given request <i>r</i>
-     */
-    void processRequest(Object r, Channel channel);
+    public SlowSortedLedgerStorage() {
+        this(new SlowInterleavedLedgerStorage());
+    }
 
+    SlowSortedLedgerStorage(InterleavedLedgerStorage ils) {
+        super(ils);
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index f3ef108..322cdd0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -225,7 +225,7 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
         EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) storage.getEntryLogger()
                 .getEntryLogManager();
         entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
-        long leastUnflushedLogId = storage.entryLogger.getLeastUnflushedLogId();
+        long leastUnflushedLogId = storage.getEntryLogger().getLeastUnflushedLogId();
         long currentLogId = entryLogManager.getCurrentLogId();
         log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
 
@@ -244,8 +244,8 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
         assertEquals(0, storage.memTable.kvmap.size());
         assertTrue(
             "current log " + currentLogId + " contains entries added from memtable should be forced to disk"
-            + " but least unflushed log is " + storage.entryLogger.getLeastUnflushedLogId(),
-            storage.entryLogger.getLeastUnflushedLogId() > currentLogId);
+            + " but least unflushed log is " + storage.getEntryLogger().getLeastUnflushedLogId(),
+            storage.getEntryLogger().getLeastUnflushedLogId() > currentLogId);
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
new file mode 100644
index 0000000..5454a70
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
@@ -0,0 +1,469 @@
+package org.apache.bookkeeper.proto;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.nio.channels.FileChannel;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Journal;
+import org.apache.bookkeeper.bookie.SlowBufferedChannel;
+import org.apache.bookkeeper.bookie.SlowInterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.SlowSortedLedgerStorage;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Tests for backpressure handling on the server side.
+ */
+// PowerMock usage is problematic here due to https://github.com/powermock/powermock/issues/822
+public class BookieBackpressureTest extends BookKeeperClusterTestCase
+        implements AddCallback, ReadCallback, ReadLastConfirmedCallback {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BookieBackpressureTest.class);
+
+    byte[] ledgerPassword = "aaa".getBytes();
+
+    final byte[] data = new byte[8 * 1024];
+
+    // test related constants
+    static final int NUM_ENTRIES_TO_WRITE = 200;
+    static final int ENTRIES_IN_MEMTABLE = 2;
+    static final int MAX_PENDING = 2 * ENTRIES_IN_MEMTABLE + 1;
+    static final int NUM_OF_LEDGERS = 2 * MAX_PENDING;
+
+    DigestType digestType;
+
+    long getDelay;
+    long addDelay;
+    long flushDelay;
+
+    public BookieBackpressureTest() {
+        super(1);
+        this.digestType = DigestType.CRC32;
+
+        baseClientConf.setAddEntryTimeout(100);
+        baseClientConf.setAddEntryQuorumTimeout(100);
+        baseClientConf.setReadEntryTimeout(100);
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        getDelay = 0;
+        addDelay = 0;
+        flushDelay = 0;
+    }
+
+    class SyncObj {
+        long lastConfirmed;
+        volatile int counter;
+        boolean value;
+        AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+        Enumeration<LedgerEntry> ls = null;
+
+        public SyncObj() {
+            counter = 0;
+            lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
+            value = false;
+        }
+
+        void setReturnCode(int rc) {
+            this.rc.compareAndSet(BKException.Code.OK, rc);
+        }
+
+        void setLedgerEntries(Enumeration<LedgerEntry> ls) {
+            this.ls = ls;
+        }
+    }
+
+    private void mockJournal(Bookie bookie, long getDelay, long addDelay, long flushDelay) throws Exception {
+        if (getDelay <= 0 && addDelay <= 0 && flushDelay <= 0) {
+            return;
+        }
+
+        List<Journal> journals = getJournals(bookie);
+        for (int i = 0; i < journals.size(); i++) {
+            Journal mock = spy(journals.get(i));
+            when(mock.getBufferedChannelBuilder()).thenReturn((FileChannel fc, int capacity) ->  {
+                SlowBufferedChannel sbc = new SlowBufferedChannel(fc, capacity);
+                sbc.setAddDelay(addDelay);
+                sbc.setGetDelay(getDelay);
+                sbc.setFlushDelay(flushDelay);
+                return sbc;
+            });
+
+            journals.set(i, mock);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<Journal> getJournals(Bookie bookie) throws NoSuchFieldException, IllegalAccessException {
+        Field f = bookie.getClass().getDeclaredField("journals");
+        f.setAccessible(true);
+
+        return (List<Journal>) f.get(bookie);
+    }
+
+    @Test
+    public void testWriteNoBackpressureSlowJournal() throws Exception {
+        //disable backpressure for writes
+        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        addDelay = 1;
+
+        doWritesNoBackpressure(0);
+    }
+
+    @Test
+    public void testWriteNoBackpressureSlowJournalFlush() throws Exception {
+        //disable backpressure for writes
+        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        // to increase frequency of flushes
+        bsConfs.get(0).setJournalAdaptiveGroupWrites(false);
+        flushDelay = 1;
+
+        doWritesNoBackpressure(0);
+    }
+
+    @Test
+    public void testWriteWithBackpressureSlowJournal() throws Exception {
+        //enable backpressure with MAX_PENDING writes in progress
+        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        flushDelay = 1;
+
+        doWritesWithBackpressure(0);
+    }
+
+
+    @Test
+    public void testWriteWithBackpressureSlowJournalFlush() throws Exception {
+        //enable backpressure with MAX_PENDING writes in progress
+        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        // to increase frequency of flushes
+        bsConfs.get(0).setJournalAdaptiveGroupWrites(false);
+        flushDelay = 1;
+
+        doWritesWithBackpressure(0);
+    }
+
+    @Test
+    public void testWriteNoBackpressureSlowInterleavedStorage() throws Exception {
+        //disable backpressure for writes
+        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+
+        doWritesNoBackpressure(0);
+    }
+
+    @Test
+    public void testWriteWithBackpressureSlowInterleavedStorage() throws Exception {
+        //enable backpressure with MAX_PENDING writes in progress
+        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+
+        doWritesWithBackpressure(0);
+    }
+
+    @Test
+    public void testWriteNoBackpressureSlowInterleavedStorageFlush() throws Exception {
+        //disable backpressure for writes
+        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+        doWritesNoBackpressure(0);
+    }
+
+    @Test
+    public void testWriteWithBackpressureSlowInterleavedStorageFlush() throws Exception {
+        //enable backpressure with MAX_PENDING writes in progress
+        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+        doWritesWithBackpressure(0);
+    }
+
+    @Test
+    public void testWriteNoBackpressureSortedStorage() throws Exception {
+        //disable backpressure for writes
+        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        // one for memtable being flushed, one for the part accepting the data
+        assertTrue("for the test, memtable should not keep more entries than allowed",
+                ENTRIES_IN_MEMTABLE * 2 <= MAX_PENDING);
+        bsConfs.get(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1);
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+        doWritesNoBackpressure(0);
+    }
+
+    @Test
+    public void testWriteWithBackpressureSortedStorage() throws Exception {
+        //enable backpressure with MAX_PENDING writes in progress
+        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        // one for memtable being flushed, one for the part accepting the data
+        assertTrue("for the test, memtable should not keep more entries than allowed",
+                ENTRIES_IN_MEMTABLE * 2 <= MAX_PENDING);
+        bsConfs.get(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1);
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+        doWritesWithBackpressure(0);
+    }
+
+    @Test
+    public void testReadsNoBackpressure() throws Exception {
+        //disable backpressure for reads
+        bsConfs.get(0).setMaxReadsInProgressLimit(0);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
+
+        final BookieRequestProcessor brp = generateDataAndDoReads(0);
+
+        Assert.assertThat("reads in progress should exceed MAX_PENDING",
+                brp.maxReadsInProgressCount(), Matchers.greaterThan(MAX_PENDING));
+    }
+
+   @Test
+    public void testReadsWithBackpressure() throws Exception {
+        //enable backpressure for reads
+        bsConfs.get(0).setMaxReadsInProgressLimit(MAX_PENDING);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
+
+        final BookieRequestProcessor brp = generateDataAndDoReads(0);
+
+        Assert.assertThat("reads in progress should NOT exceed MAX_PENDING ",
+                brp.maxReadsInProgressCount(), Matchers.lessThanOrEqualTo(MAX_PENDING));
+    }
+
+    private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exception {
+        BookieServer bks = bs.get(bkId);
+        bks.shutdown();
+        bks = new BookieServer(bsConfs.get(bkId));
+        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        bks.start();
+        bs.set(bkId, bks);
+
+        LOG.info("creating ledgers");
+        // Create ledgers
+        final int numEntriesForReads = 10;
+        LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
+        for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+            lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword);
+            LOG.info("created ledger ID: {}", lhs[i].getId());
+        }
+
+        LOG.info("generating data for reads");
+        final CountDownLatch writesCompleteLatch = new CountDownLatch(numEntriesForReads * NUM_OF_LEDGERS);
+        for (int i = 0; i < numEntriesForReads; i++) {
+            for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+                lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> writesCompleteLatch.countDown(), null);
+            }
+        }
+        writesCompleteLatch.await();
+
+        LOG.info("issue bunch of async reads");
+        final CountDownLatch readsCompleteLatch = new CountDownLatch(numEntriesForReads * NUM_OF_LEDGERS);
+        for (int i = 0; i < numEntriesForReads; i++) {
+            for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+                lhs[ledger].asyncReadEntries(i, i, (rc, lh, seq, ctx) -> readsCompleteLatch.countDown(), null);
+            }
+        }
+        readsCompleteLatch.await();
+        LOG.info("reads finished");
+
+        return bks.getBookieRequestProcessor();
+    }
+
+    // here we expect that backpressure is disabled and number of writes in progress
+    // will exceed the limit
+    private void doWritesNoBackpressure(final int bkId) throws Exception {
+        BookieServer bks = bs.get(bkId);
+        bks.shutdown();
+        bks = new BookieServer(bsConfs.get(bkId));
+        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        bks.start();
+        bs.set(bkId, bks);
+
+        LOG.info("Creating ledgers");
+        LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
+        for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+            lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword);
+            LOG.info("created ledger ID: {}", lhs[i].getId());
+        }
+
+        final CountDownLatch completeLatch = new CountDownLatch(NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS);
+
+        LOG.info("submitting writes");
+        for (int i = 0; i < NUM_ENTRIES_TO_WRITE; i++) {
+            for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+                lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> completeLatch.countDown(), null);
+            }
+        }
+
+        boolean exceededLimit = false;
+        BookieRequestProcessor brp = bks.getBookieRequestProcessor();
+        while (!completeLatch.await(1, TimeUnit.MILLISECONDS)) {
+            int val = brp.maxAddsInProgressCount();
+            if (val > MAX_PENDING) {
+                exceededLimit = true;
+                break;
+            }
+            LOG.info("Waiting until all writes succeeded or maxAddsInProgressCount {} > MAX_PENDING {}",
+                    val, MAX_PENDING);
+        }
+
+        assertTrue("expected to exceed number of pending writes", exceededLimit);
+
+        for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+            lhs[i].close();
+        }
+    }
+
+    // here we expect that backpressure is enabled and number of writes in progress
+    // will never exceed the limit
+    private void doWritesWithBackpressure(final int bkId) throws Exception {
+        BookieServer bks = bs.get(bkId);
+        bks.shutdown();
+        bks = new BookieServer(bsConfs.get(bkId));
+        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        bks.start();
+        bs.set(bkId, bks);
+
+        LOG.info("Creating ledgers");
+        LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
+        for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+            lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword);
+            LOG.info("created ledger ID: {}", lhs[i].getId());
+        }
+
+        final CountDownLatch completeLatch = new CountDownLatch(NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS);
+        final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+
+        LOG.info("submitting writes");
+        for (int i = 0; i < NUM_ENTRIES_TO_WRITE; i++) {
+            for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+                lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> {
+                    rc.compareAndSet(BKException.Code.OK, rc2);
+                    completeLatch.countDown();
+                }, null);
+            }
+        }
+
+        LOG.info("test submitted all writes");
+        BookieRequestProcessor brp = bks.getBookieRequestProcessor();
+        while (!completeLatch.await(1, TimeUnit.MILLISECONDS)) {
+            int val = brp.maxAddsInProgressCount();
+            assertTrue("writes in progress should not exceed limit, got " + val, val <= MAX_PENDING);
+            LOG.info("Waiting for all writes to succeed, left {} of {}",
+                    completeLatch.getCount(), NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS);
+        }
+
+        if (rc.get() != BKException.Code.OK) {
+            throw BKException.create(rc.get());
+        }
+
+        for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+            lhs[i].close();
+        }
+    }
+
+
+    @Override
+    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+        SyncObj sync = (SyncObj) ctx;
+        sync.setReturnCode(rc);
+        synchronized (sync) {
+            sync.counter++;
+            sync.notify();
+        }
+    }
+
+    @Override
+    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+        SyncObj sync = (SyncObj) ctx;
+        sync.setLedgerEntries(seq);
+        sync.setReturnCode(rc);
+        synchronized (sync) {
+            sync.value = true;
+            sync.notify();
+        }
+    }
+
+    @Override
+    public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
+        SyncObj sync = (SyncObj) ctx;
+        sync.setReturnCode(rc);
+        synchronized (sync) {
+            sync.lastConfirmed = lastConfirmed;
+            sync.notify();
+        }
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index 90a51c8..37d4647 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -74,6 +74,7 @@ public class ForceLedgerProcessorV3Test {
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
         when(requestProcessor.getForceLedgerStats())
             .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger"));
         when(requestProcessor.getForceLedgerRequestStats())
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index 354fcaf..4ebe01c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -47,6 +47,8 @@ import org.junit.Test;
  */
 public class TestBookieRequestProcessor {
 
+    final BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
+
     @Test
     public void testConstructLongPollThreads() throws Exception {
         // long poll threads == read threads
@@ -134,7 +136,7 @@ public class TestBookieRequestProcessor {
                 .setBody(ByteString.copyFrom("entrydata".getBytes())).build();
         Request request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build();
 
-        WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, null);
+        WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor);
         String toString = writeEntryProcessorV3.toString();
         assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body"));
         assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey",
@@ -152,7 +154,7 @@ public class TestBookieRequestProcessor {
                 .setBody(ByteString.copyFrom("entrydata".getBytes())).setFlag(Flag.RECOVERY_ADD).setWriteFlags(0)
                 .build();
         request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build();
-        writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, null);
+        writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor);
         toString = writeEntryProcessorV3.toString();
         assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body"));
         assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey",
@@ -188,7 +190,7 @@ public class TestBookieRequestProcessor {
                 .setMasterKey(ByteString.copyFrom("masterKey".getBytes()))
                 .setBody(ByteString.copyFrom("entrydata".getBytes())).build();
         request = Request.newBuilder().setHeader(header).setWriteLacRequest(writeLacRequest).build();
-        WriteLacProcessorV3 writeLacProcessorV3 = new WriteLacProcessorV3(request, null, null);
+        WriteLacProcessorV3 writeLacProcessorV3 = new WriteLacProcessorV3(request, null, requestProcessor);
         toString = writeLacProcessorV3.toString();
         assertFalse("writeLacProcessorV3's toString should have filtered out body", toString.contains("body"));
         assertFalse("writeLacProcessorV3's toString should have filtered out masterKey",
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 8f54ddb..df7b153 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -79,10 +79,13 @@ public class WriteEntryProcessorV3Test {
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
         when(requestProcessor.getAddEntryStats())
             .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
         when(requestProcessor.getAddRequestStats())
             .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+        when(requestProcessor.getChannelWriteStats())
+                .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("CHANNEL_WRITE"));
         processor = new WriteEntryProcessorV3(
             request,
             channel,
@@ -242,4 +245,41 @@ public class WriteEntryProcessorV3Test {
         assertEquals(StatusCode.EOK, response.getStatus());
     }
 
+    @Test
+    public void testWritesWithClientNotAcceptingReponses() throws Exception {
+        when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(5L);
+
+        doAnswer(invocationOnMock -> {
+            Channel ch = invocationOnMock.getArgument(0);
+            ch.close();
+            return null;
+        }).when(requestProcessor).handleNonWritableChannel(any());
+
+        when(channel.isWritable()).thenReturn(false);
+
+        when(bookie.isReadOnly()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            WriteCallback wc = invocationOnMock.getArgument(2);
+
+            wc.writeComplete(
+                    0,
+                    request.getAddRequest().getLedgerId(),
+                    request.getAddRequest().getEntryId(),
+                    null,
+                    null);
+            return null;
+        }).when(bookie).addEntry(
+                any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+
+        processor.run();
+
+        verify(bookie, times(1))
+                .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+        verify(requestProcessor, times(1)).handleNonWritableChannel(channel);
+        verify(channel, times(0)).writeAndFlush(any(Response.class));
+        verify(channel, times(1)).close();
+    }
+
 }
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 7bed0b7..83a39f4 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -62,6 +62,12 @@
     <Bug pattern="EI_EXPOSE_REP" />
   </Match>
   <Match>
+    <!-- https://github.com/spotbugs/spotbugs/issues/259 -->
+    <Class name="org.apache.bookkeeper.bookie.InterleavedLedgerStorage" />
+    <Method name="getLastAddConfirmed" />
+    <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
+  </Match>
+  <Match>
     <Class name="org.apache.bookkeeper.auth.AuthToken" />
     <Method name="getData" />
     <Bug pattern="EI_EXPOSE_REP" />

-- 
To stop receiving notification emails like this one, please contact
ayegorov@apache.org.