You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/06/01 23:28:19 UTC

[GitHub] dlg99 closed pull request #1410: Issue #1409: Added server side backpressure (@bug W-3651831@)

dlg99 closed pull request #1410: Issue #1409: Added server side backpressure (@bug W-3651831@)
URL: https://github.com/apache/bookkeeper/pull/1410
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index d488bc96e..19db7e9e7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -77,6 +77,13 @@
     String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
     String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
 
+    String ADD_ENTRY_IN_PROGRESS = "ADD_ENTRY_IN_PROGRESS";
+    String ADD_ENTRY_BLOCKED = "ADD_ENTRY_BLOCKED";
+    String ADD_ENTRY_BLOCKED_WAIT = "ADD_ENTRY_BLOCKED_WAIT";
+    String READ_ENTRY_IN_PROGRESS = "READ_ENTRY_IN_PROGRESS";
+    String READ_ENTRY_BLOCKED = "READ_ENTRY_BLOCKED";
+    String READ_ENTRY_BLOCKED_WAIT = "READ_ENTRY_BLOCKED_WAIT";
+
     //
     // Journal Stats (scoped under SERVER_SCOPE)
     //
@@ -137,6 +144,7 @@
     String JOURNAL_NUM_FLUSH_MAX_WAIT = "JOURNAL_NUM_FLUSH_MAX_WAIT";
     String SKIP_LIST_FLUSH_BYTES = "SKIP_LIST_FLUSH_BYTES";
     String SKIP_LIST_THROTTLING = "SKIP_LIST_THROTTLING";
+    String SKIP_LIST_THROTTLING_LATENCY = "SKIP_LIST_THROTTLING_LATENCY";
     String READ_LAST_ENTRY_NOENTRY_ERROR = "READ_LAST_ENTRY_NOENTRY_ERROR";
     String LEDGER_CACHE_NUM_EVICTED_LEDGERS = "LEDGER_CACHE_NUM_EVICTED_LEDGERS";
     String PENDING_GET_FILE_INFO = "PENDING_GET_FILE_INFO";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index 70f437cb3..73283989b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -24,10 +24,12 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_PUT_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_SNAPSHOT;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SKIP_LIST_THROTTLING_LATENCY;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -103,6 +105,7 @@ public boolean equals(Object o) {
     final AtomicLong size;
 
     final long skipListSizeLimit;
+    final Semaphore skipListSemaphore;
 
     SkipListArena allocator;
 
@@ -119,6 +122,7 @@ private EntrySkipList newSkipList() {
     private final OpStatsLogger getEntryStats;
     final Counter flushBytesCounter;
     private final Counter throttlingCounter;
+    private final OpStatsLogger throttlingStats;
 
     /**
     * Constructor.
@@ -136,12 +140,22 @@ public EntryMemTable(final ServerConfiguration conf, final CheckpointSource sour
         // skip list size limit
         this.skipListSizeLimit = conf.getSkipListSizeLimit();
 
+        if (skipListSizeLimit > (Integer.MAX_VALUE - 1) / 2) {
+            // gives 2*1023MB for mem table.
+            // consider a way to create semaphore with long num of permits
+            // until that 1023MB should be enough for everything (tm)
+            throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2));
+        }
+        // double the size for snapshot in progress + incoming data
+        this.skipListSemaphore = new Semaphore((int) skipListSizeLimit * 2);
+
         // Stats
         this.snapshotStats = statsLogger.getOpStatsLogger(SKIP_LIST_SNAPSHOT);
         this.putEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_PUT_ENTRY);
         this.getEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_GET_ENTRY);
         this.flushBytesCounter = statsLogger.getCounter(SKIP_LIST_FLUSH_BYTES);
         this.throttlingCounter = statsLogger.getCounter(SKIP_LIST_THROTTLING);
+        this.throttlingStats = statsLogger.getOpStatsLogger(SKIP_LIST_THROTTLING_LATENCY);
     }
 
     void dump() {
@@ -264,6 +278,7 @@ long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws
             }
         }
 
+        skipListSemaphore.release((int) size);
         return size;
     }
 
@@ -285,18 +300,6 @@ void clearSnapshot(final EntrySkipList keyValues) {
         }
     }
 
-    /**
-     * Throttling writer w/ 1 ms delay.
-     */
-    private void throttleWriters() {
-        try {
-            Thread.sleep(1);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        throttlingCounter.inc();
-    }
-
     /**
      * Write an update.
      *
@@ -314,11 +317,18 @@ public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final
                 Checkpoint cp = snapshot();
                 if ((null != cp) || (!previousFlushSucceeded.get())) {
                     cb.onSizeLimitReached(cp);
-                } else {
-                    throttleWriters();
                 }
             }
 
+            final int len = entry.remaining();
+            if (!skipListSemaphore.tryAcquire(len)) {
+                throttlingCounter.inc();
+                final long throttlingStartTimeNanos = MathUtils.nowInNano();
+                skipListSemaphore.acquireUninterruptibly(len);
+                throttlingStats.registerSuccessfulEvent(MathUtils.elapsedNanos(throttlingStartTimeNanos),
+                        TimeUnit.NANOSECONDS);
+            }
+
             this.lock.readLock().lock();
             try {
                 EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
index a3849e915..4f2cf0229 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
@@ -144,6 +144,7 @@ public void safeRun() {
                 }
             }
         }
+        skipListSemaphore.release(flushedSize.intValue());
         return flushedSize.longValue();
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 71477c027..1da043501 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -36,6 +36,7 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -76,6 +77,19 @@
         boolean accept(long journalId);
     }
 
+    /**
+     * For testability.
+     */
+    @FunctionalInterface
+    public interface BufferedChannelBuilder {
+        BufferedChannelBuilder DEFAULT_BCBUILDER =
+                (FileChannel fc, int capacity) -> new BufferedChannel(fc, capacity);
+
+        BufferedChannel create(FileChannel fc, int capacity) throws IOException;
+    }
+
+
+
     /**
      * List all journal ids by a specified journal id filer.
      *
@@ -935,11 +949,14 @@ public void run() {
             while (true) {
                 // new journal file to write
                 if (null == logFile) {
+
                     logId = logId + 1;
 
                     journalCreationWatcher.reset().start();
                     logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize,
-                                        journalAlignmentSize, removePagesFromCache, journalFormatVersionToWrite);
+                                        journalAlignmentSize, removePagesFromCache,
+                                        journalFormatVersionToWrite, getBufferedChannelBuilder());
+
                     journalCreationStats.registerSuccessfulEvent(
                             journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
 
@@ -1121,6 +1138,10 @@ public void run() {
         LOG.info("Journal exited loop!");
     }
 
+    public BufferedChannelBuilder getBufferedChannelBuilder() {
+        return BufferedChannelBuilder.DEFAULT_BCBUILDER;
+    }
+
     /**
      * Shuts down the journal.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index 6d5cae050..507c933b1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -93,7 +93,8 @@
     // Open journal for scanning starting from given position.
     JournalChannel(File journalDirectory, long logId,
                    long preAllocSize, int writeBufferSize, long position) throws IOException {
-         this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5);
+         this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE,
+                 position, false, V5, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
     }
 
     // Open journal to write
@@ -101,11 +102,20 @@
                    long preAllocSize, int writeBufferSize, int journalAlignSize,
                    boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException {
         this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
-             START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite);
+             fRemoveFromPageCache, formatVersionToWrite, Journal.BufferedChannelBuilder.DEFAULT_BCBUILDER);
+    }
+
+    JournalChannel(File journalDirectory, long logId,
+                   long preAllocSize, int writeBufferSize, int journalAlignSize,
+                   boolean fRemoveFromPageCache, int formatVersionToWrite,
+                   Journal.BufferedChannelBuilder bcBuilder) throws IOException {
+        this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
+                START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, bcBuilder);
     }
 
     /**
      * Create a journal file.
+     * Allows injection of BufferedChannelBuilder for testing purposes.
      *
      * @param journalDirectory
      *          directory to store the journal file.
@@ -128,7 +138,7 @@
     private JournalChannel(File journalDirectory, long logId,
                            long preAllocSize, int writeBufferSize, int journalAlignSize,
                            long position, boolean fRemoveFromPageCache,
-                           int formatVersionToWrite) throws IOException {
+                           int formatVersionToWrite, Journal.BufferedChannelBuilder bcBuilder) throws IOException {
         this.journalAlignSize = journalAlignSize;
         this.zeros = ByteBuffer.allocate(journalAlignSize);
         this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize;
@@ -160,7 +170,7 @@ private JournalChannel(File journalDirectory, long logId,
             bb.clear();
             fc.write(bb);
 
-            bc = new BufferedChannel(fc, writeBufferSize);
+            bc = bcBuilder.create(fc, writeBufferSize);
             forceWrite(true);
             nextPrealloc = this.preAllocSize;
             fc.write(zeros, nextPrealloc - journalAlignSize);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index dcfac3147..34e32b949 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -169,4 +169,10 @@ boolean waitForLastAddConfirmedUpdate(long ledgerId,
     void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException;
 
     ByteBuf getExplicitLac(long ledgerId);
+
+    // for testability
+    default LedgerStorage getUnderlyingLedgerStorage() {
+        return this;
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
new file mode 100644
index 000000000..9fdc34ca7
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
@@ -0,0 +1,93 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Strictly for testing.
+ * Have to be alongside with prod code for Journal to inject in tests.
+ */
+public class SlowBufferedChannel extends BufferedChannel {
+    public volatile long getDelay = 0;
+    public volatile long addDelay = 0;
+    public volatile long flushDelay = 0;
+
+    public SlowBufferedChannel(FileChannel fc, int capacity) throws IOException {
+        super(fc, capacity);
+    }
+
+    public SlowBufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
+        super(fc, writeCapacity, readCapacity);
+    }
+
+    public void setAddDelay(long delay) {
+        addDelay = delay;
+    }
+
+    public void setGetDelay(long delay) {
+        getDelay = delay;
+    }
+
+    public void setFlushDelay(long delay) {
+        flushDelay = delay;
+    }
+
+    @Override
+    public synchronized void write(ByteBuf src) throws IOException {
+        delayMs(addDelay);
+        super.write(src);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        delayMs(flushDelay);
+        super.flush();
+    }
+
+    @Override
+    public long forceWrite(boolean forceMetadata) throws IOException {
+        delayMs(flushDelay);
+        return super.forceWrite(forceMetadata);
+    }
+
+    @Override
+    public synchronized int read(ByteBuf dest, long pos) throws IOException {
+        delayMs(getDelay);
+        return super.read(dest, pos);
+    }
+
+    private static void delayMs(long delay) {
+        if (delay < 1) {
+            return;
+        }
+        try {
+            TimeUnit.MILLISECONDS.sleep(delay);
+        } catch (InterruptedException e) {
+            //noop
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 815c65e18..5c4f75a22 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -41,16 +42,23 @@
  * entries will be first added into a {@code MemTable}, and then be flushed back to the
  * {@code InterleavedLedgerStorage} when the {@code MemTable} becomes full.
  */
-public class SortedLedgerStorage extends InterleavedLedgerStorage
-        implements LedgerStorage, CacheCallback, SkipListFlusher {
+public class SortedLedgerStorage
+        implements LedgerStorage, CacheCallback, SkipListFlusher,
+            CompactableLedgerStorage, EntryLogger.EntryLogListener {
     private static final Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class);
 
     EntryMemTable memTable;
     private ScheduledExecutorService scheduler;
     private StateManager stateManager;
+    private final InterleavedLedgerStorage interleavedLedgerStorage;
 
     public SortedLedgerStorage() {
-        super();
+        this(new InterleavedLedgerStorage());
+    }
+
+    @VisibleForTesting
+    protected SortedLedgerStorage(InterleavedLedgerStorage ils) {
+        interleavedLedgerStorage = ils;
     }
 
     @Override
@@ -63,7 +71,8 @@ public void initialize(ServerConfiguration conf,
                            Checkpointer checkpointer,
                            StatsLogger statsLogger)
             throws IOException {
-        super.initialize(
+
+        interleavedLedgerStorage.initialize(
             conf,
             ledgerManager,
             ledgerDirsManager,
@@ -72,6 +81,7 @@ public void initialize(ServerConfiguration conf,
             checkpointSource,
             checkpointer,
             statsLogger);
+
         if (conf.isEntryLogPerLedgerEnabled()) {
             this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger);
         } else {
@@ -96,7 +106,7 @@ public void start() {
         } catch (IOException e) {
             LOG.error("Exception thrown while flushing ledger cache.", e);
         }
-        super.start();
+        interleavedLedgerStorage.start();
     }
 
     @Override
@@ -111,22 +121,42 @@ public void shutdown() throws InterruptedException {
         } catch (Exception e) {
             LOG.error("Error while closing the memtable", e);
         }
-        super.shutdown();
+        interleavedLedgerStorage.shutdown();
     }
 
     @Override
     public boolean ledgerExists(long ledgerId) throws IOException {
         // Done this way because checking the skip list is an O(logN) operation compared to
         // the O(1) for the ledgerCache.
-        if (!super.ledgerExists(ledgerId)) {
+        if (!interleavedLedgerStorage.ledgerExists(ledgerId)) {
             EntryKeyValue kv = memTable.getLastEntry(ledgerId);
             if (null == kv) {
-                return super.ledgerExists(ledgerId);
+                return interleavedLedgerStorage.ledgerExists(ledgerId);
             }
         }
         return true;
     }
 
+    @Override
+    public boolean setFenced(long ledgerId) throws IOException {
+        return interleavedLedgerStorage.setFenced(ledgerId);
+    }
+
+    @Override
+    public boolean isFenced(long ledgerId) throws IOException {
+        return interleavedLedgerStorage.isFenced(ledgerId);
+    }
+
+    @Override
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        interleavedLedgerStorage.setMasterKey(ledgerId, masterKey);
+    }
+
+    @Override
+    public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+        return interleavedLedgerStorage.readMasterKey(ledgerId);
+    }
+
     @Override
     public long addEntry(ByteBuf entry) throws IOException {
         long ledgerId = entry.getLong(entry.readerIndex() + 0);
@@ -134,7 +164,7 @@ public long addEntry(ByteBuf entry) throws IOException {
         long lac = entry.getLong(entry.readerIndex() + 16);
 
         memTable.addEntry(ledgerId, entryId, entry.nioBuffer(), this);
-        ledgerCache.updateLastAddConfirmed(ledgerId, lac);
+        interleavedLedgerStorage.ledgerCache.updateLastAddConfirmed(ledgerId, lac);
         return entryId;
     }
 
@@ -149,7 +179,7 @@ private ByteBuf getLastEntryId(long ledgerId) throws IOException {
             return kv.getValueAsByteBuffer();
         }
         // If it doesn't exist in the skip list, then fallback to the ledger cache+index.
-        return super.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+        return interleavedLedgerStorage.getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
     }
 
     @Override
@@ -159,13 +189,13 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
         }
         ByteBuf buffToRet;
         try {
-            buffToRet = super.getEntry(ledgerId, entryId);
+            buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId);
         } catch (Bookie.NoEntryException nee) {
             EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
             if (null == kv) {
                 // The entry might have been flushed since we last checked, so query the ledger cache again.
                 // If the entry truly doesn't exist, then this will throw a NoEntryException
-                buffToRet = super.getEntry(ledgerId, entryId);
+                buffToRet = interleavedLedgerStorage.getEntry(ledgerId, entryId);
             } else {
                 buffToRet = kv.getValueAsByteBuffer();
             }
@@ -174,23 +204,56 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
         return buffToRet;
     }
 
+    @Override
+    public long getLastAddConfirmed(long ledgerId) throws IOException {
+        return interleavedLedgerStorage.getLastAddConfirmed(ledgerId);
+    }
+
+    @Override
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                 long previousLAC,
+                                                 Watcher<LastAddConfirmedUpdateNotification> watcher)
+            throws IOException {
+        return interleavedLedgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
+    }
+
     @Override
     public void checkpoint(final Checkpoint checkpoint) throws IOException {
         long numBytesFlushed = memTable.flush(this, checkpoint);
-        entryLogger.prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
-        super.checkpoint(checkpoint);
+        interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
+        interleavedLedgerStorage.checkpoint(checkpoint);
+    }
+
+    @Override
+    public void deleteLedger(long ledgerId) throws IOException {
+        interleavedLedgerStorage.deleteLedger(ledgerId);
+    }
+
+    @Override
+    public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
+        interleavedLedgerStorage.registerLedgerDeletionListener(listener);
+    }
+
+    @Override
+    public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
+        interleavedLedgerStorage.setExplicitlac(ledgerId, lac);
+    }
+
+    @Override
+    public ByteBuf getExplicitLac(long ledgerId) {
+        return interleavedLedgerStorage.getExplicitLac(ledgerId);
     }
 
     @Override
     public void process(long ledgerId, long entryId,
                         ByteBuf buffer) throws IOException {
-        processEntry(ledgerId, entryId, buffer, false);
+        interleavedLedgerStorage.processEntry(ledgerId, entryId, buffer, false);
     }
 
     @Override
     public void flush() throws IOException {
         memTable.flush(this, Checkpoint.MAX);
-        super.flush();
+        interleavedLedgerStorage.flush();
     }
 
     // CacheCallback functions.
@@ -212,10 +275,10 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException {
             public void run() {
                 try {
                     LOG.info("Started flushing mem table.");
-                    entryLogger.prepareEntryMemTableFlush();
+                    interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush();
                     memTable.flush(SortedLedgerStorage.this);
-                    if (entryLogger.commitEntryMemTableFlush()) {
-                        checkpointer.startCheckpoint(cp);
+                    if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) {
+                        interleavedLedgerStorage.checkpointer.startCheckpoint(cp);
                     }
                 } catch (Exception e) {
                     stateManager.transitionToReadOnlyMode();
@@ -237,4 +300,28 @@ BookieStateManager getStateManager(){
         return (BookieStateManager) stateManager;
     }
 
+    @Override
+    public EntryLogger getEntryLogger() {
+        return interleavedLedgerStorage.getEntryLogger();
+    }
+
+    @Override
+    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException {
+        return interleavedLedgerStorage.getActiveLedgersInRange(firstLedgerId, lastLedgerId);
+    }
+
+    @Override
+    public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
+        interleavedLedgerStorage.updateEntriesLocations(locations);
+    }
+
+    @Override
+    public void flushEntriesLocationsIndex() throws IOException {
+        interleavedLedgerStorage.flushEntriesLocationsIndex();
+    }
+
+    @Override
+    public LedgerStorage getUnderlyingLedgerStorage() {
+        return interleavedLedgerStorage;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 7b64d1b39..0dadc6274 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -128,6 +128,7 @@
     protected static final String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";
     protected static final String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs";
     protected static final String TIMEOUT_TIMER_NUM_TICKS = "timeoutTimerNumTicks";
+    // backpressure configuration
     protected static final String WAIT_TIMEOUT_ON_BACKPRESSURE = "waitTimeoutOnBackpressureMs";
 
     // Bookie health check settings
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index d51e93da3..faf671de7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -81,6 +81,12 @@
     protected static final String JOURNAL_ALIGNMENT_SIZE = "journalAlignmentSize";
     protected static final String NUM_JOURNAL_CALLBACK_THREADS = "numJournalCallbackThreads";
     protected static final String JOURNAL_FORMAT_VERSION_TO_WRITE = "journalFormatVersionToWrite";
+    // backpressure control
+    protected static final String MAX_ADDS_IN_PROGRESS_LIMIT = "maxAddsInProgressLimit";
+    protected static final String MAX_READS_IN_PROGRESS_LIMIT = "maxReadsInProgressLimit";
+    protected static final String CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT = "closeChannelOnResponseTimeout";
+    protected static final String WAIT_TIMEOUT_ON_RESPONSE_BACKPRESSURE = "waitTimeoutOnResponseBackpressureMs";
+
     // Bookie Parameters
     protected static final String BOOKIE_PORT = "bookiePort";
     protected static final String LISTENING_INTERFACE = "listeningInterface";
@@ -93,10 +99,12 @@
     protected static final String LEDGER_DIRS = "ledgerDirectories";
     protected static final String INDEX_DIRS = "indexDirectories";
     protected static final String ALLOW_STORAGE_EXPANSION = "allowStorageExpansion";
-    // NIO Parameters
+    // NIO and Netty Parameters
     protected static final String SERVER_TCP_NODELAY = "serverTcpNoDelay";
     protected static final String SERVER_SOCK_KEEPALIVE = "serverSockKeepalive";
     protected static final String SERVER_SOCK_LINGER = "serverTcpLinger";
+    protected static final String SERVER_WRITEBUFFER_LOW_WATER_MARK = "serverWriteBufferLowWaterMark";
+    protected static final String SERVER_WRITEBUFFER_HIGH_WATER_MARK = "serverWriteBufferHighWaterMark";
 
     // Zookeeper Parameters
     protected static final String ZK_RETRY_BACKOFF_START_MS = "zkRetryBackoffStartMs";
@@ -634,6 +642,102 @@ public ServerConfiguration setJournalFormatVersionToWrite(int version) {
         return this;
     }
 
+    /**
+     * Get max number of adds in progress. 0 == unlimited.
+     *
+     * @return Max number of adds in progress.
+     */
+    public int getMaxAddsInProgressLimit() {
+        return this.getInt(MAX_ADDS_IN_PROGRESS_LIMIT, 0);
+    }
+
+    /**
+     * Set max number of adds in progress. 0 == unlimited.
+     *
+     * @param value
+     *          max number of adds in progress.
+     * @return server configuration.
+     */
+    public ServerConfiguration setMaxAddsInProgressLimit(int value) {
+        this.setProperty(MAX_ADDS_IN_PROGRESS_LIMIT, value);
+        return this;
+    }
+
+    /**
+     * Get max number of reads in progress. 0 == unlimited.
+     *
+     * @return Max number of reads in progress.
+     */
+    public int getMaxReadsInProgressLimit() {
+        return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 0);
+    }
+
+    /**
+     * Set max number of reads in progress. 0 == unlimited.
+     *
+     * @param value
+     *          max number of reads in progress.
+     * @return server configuration.
+     */
+    public ServerConfiguration setMaxReadsInProgressLimit(int value) {
+        this.setProperty(MAX_READS_IN_PROGRESS_LIMIT, value);
+        return this;
+    }
+
+    /**
+     * Configures action in case if server timed out sending response to the client.
+     * true == close the channel and drop response
+     * false == drop response
+     * Requires waitTimeoutOnBackpressureMs >= 0 otherwise ignored.
+     *
+     * @return value indicating if channel should be closed.
+     */
+    public boolean getCloseChannelOnResponseTimeout(){
+        return this.getBoolean(CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT, false);
+    }
+
+    /**
+     * Configures action in case if server timed out sending response to the client.
+     * true == close the channel and drop response
+     * false == drop response
+     * Requires waitTimeoutOnBackpressureMs >= 0 otherwise ignored.
+     *
+     * @param value
+     * @return server configuration.
+     */
+    public ServerConfiguration setCloseChannelOnResponseTimeout(boolean value) {
+        this.setProperty(CLOSE_CHANNEL_ON_RESPONSE_TIMEOUT, value);
+        return this;
+    }
+
+    /**
+     * Timeout controlling wait on response send in case of unresponsive client
+     * (i.e. client in long GC etc.)
+     *
+     * @return timeout value
+     *        negative value disables the feature
+     *        0 to allow request to fail immediately
+     *        Default is -1 (disabled)
+     */
+    public long getWaitTimeoutOnResponseBackpressureMillis() {
+        return getLong(WAIT_TIMEOUT_ON_RESPONSE_BACKPRESSURE, -1);
+    }
+
+    /**
+     * Timeout controlling wait on response send in case of unresponsive client
+     * (i.e. client in long GC etc.)
+     *
+     * @param value
+     *        negative value disables the feature
+     *        0 to allow request to fail immediately
+     *        Default is -1 (disabled)
+     * @return client configuration.
+     */
+    public ServerConfiguration setWaitTimeoutOnResponseBackpressureMillis(long value) {
+        setProperty(WAIT_TIMEOUT_ON_RESPONSE_BACKPRESSURE, value);
+        return this;
+    }
+
     /**
      * Get bookie port that bookie server listen on.
      *
@@ -1540,6 +1644,7 @@ public boolean getSortedLedgerStorageEnabled() {
 
     /**
      * Get skip list data size limitation (default 64MB).
+     * Max value is 1,073,741,823
      *
      * @return skip list data size limitation
      */
@@ -1554,6 +1659,10 @@ public long getSkipListSizeLimit() {
      * @return server configuration object.
      */
     public ServerConfiguration setSkipListSizeLimit(int size) {
+        if (size > (Integer.MAX_VALUE - 1) / 2) {
+            // gives max of 2*1023MB for mem table (one being checkpointed and still writable).
+            throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2));
+        }
         setProperty(SKIP_LIST_SIZE_LIMIT, size);
         return this;
     }
@@ -2749,6 +2858,47 @@ public ServerConfiguration setIgnoreExtraServerComponentsStartupFailures(boolean
         return this;
     }
 
+    /**
+     * Get server netty channel write buffer low water mark.
+     *
+     * @return netty channel write buffer low water mark.
+     */
+    public int getServerWriteBufferLowWaterMark() {
+        return getInt(SERVER_WRITEBUFFER_LOW_WATER_MARK, 384 * 1024);
+    }
+
+    /**
+     * Set server netty channel write buffer low water mark.
+     *
+     * @param waterMark
+     *          netty channel write buffer low water mark.
+     * @return client configuration.
+     */
+    public ServerConfiguration setServerWriteBufferLowWaterMark(int waterMark) {
+        setProperty(SERVER_WRITEBUFFER_LOW_WATER_MARK, waterMark);
+        return this;
+    }
+
+    /**
+     * Get server netty channel write buffer high water mark.
+     *
+     * @return netty channel write buffer high water mark.
+     */
+    public int getServerWriteBufferHighWaterMark() {
+        return getInt(SERVER_WRITEBUFFER_HIGH_WATER_MARK, 512 * 1024);
+    }
+
+    /**
+     * Set server netty channel write buffer high water mark.
+     *
+     * @param waterMark
+     *          netty channel write buffer high water mark.
+     * @return client configuration.
+     */
+    public ServerConfiguration setServerWriteBufferHighWaterMark(int waterMark) {
+        setProperty(SERVER_WRITEBUFFER_HIGH_WATER_MARK, waterMark);
+        return this;
+    }
     /**
      * Set registration manager class.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
index 2c8cf7af3..8b328ef7d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
@@ -41,5 +41,4 @@
      *          channel received the given request <i>r</i>
      */
     void processRequest(Object r, Channel channel);
-
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index bc303b92d..d687a5c98 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -35,6 +35,7 @@
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.DefaultEventLoopGroup;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.group.ChannelGroup;
@@ -284,6 +285,8 @@ private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddre
             bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
                     new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
                             conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
+            bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
+                    conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));
 
             if (eventLoopGroup instanceof EpollEventLoopGroup) {
                 bootstrap.channel(EpollServerSocketChannel.class);
@@ -343,6 +346,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
             jvmBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
                     new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
                             conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
+            jvmBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
+                    conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));
 
             if (jvmEventLoopGroup instanceof DefaultEventLoopGroup) {
                 jvmBootstrap.channel(LocalServerChannel.class);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 2aebbb955..becfe686e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -22,6 +22,9 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_BLOCKED_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_IN_PROGRESS;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
@@ -29,9 +32,12 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_READ;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_FENCE_WAIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_IN_PROGRESS;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_PRE_WAIT;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_READ;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG_POLL_REQUEST;
@@ -45,6 +51,9 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
 import static org.apache.bookkeeper.proto.RequestUtils.hasFlag;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
 
@@ -54,19 +63,25 @@
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
@@ -88,6 +103,7 @@
      * worker threads.
      */
     private final ServerConfiguration serverCfg;
+    private final long waitTimeoutOnBackpressureMillis;
 
     /**
      * This is the Bookie instance that is used to handle all read and write requests.
@@ -150,10 +166,27 @@
     final OpStatsLogger getBookieInfoRequestStats;
     final OpStatsLogger getBookieInfoStats;
     final OpStatsLogger channelWriteStats;
+    final OpStatsLogger addEntryBlockedStats;
+    final OpStatsLogger readEntryBlockedStats;
+
+    final AtomicInteger addsInProgress = new AtomicInteger(0);
+    final AtomicInteger maxAddsInProgress = new AtomicInteger(0);
+    final AtomicInteger addsBlocked = new AtomicInteger(0);
+    final AtomicInteger readsInProgress = new AtomicInteger(0);
+    final AtomicInteger readsBlocked = new AtomicInteger(0);
+    final AtomicInteger maxReadsInProgress = new AtomicInteger(0);
+
+    final Semaphore addsSemaphore;
+    final Semaphore readsSemaphore;
+
+    // to temporary blacklist channels
+    final Optional<Cache<Channel, Boolean>> blacklistedChannels;
+    final Consumer<Channel> onResponseTimeout;
 
     public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
             StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException {
         this.serverCfg = serverCfg;
+        this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
         this.bookie = bookie;
         this.readThreadPool = createExecutor(
                 this.serverCfg.getNumReadWorkerThreads(),
@@ -190,6 +223,25 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
             shFactory.init(NodeType.Server, serverCfg);
         }
 
+        if (waitTimeoutOnBackpressureMillis > 0) {
+            blacklistedChannels = Optional.of(CacheBuilder.newBuilder()
+                    .expireAfterWrite(waitTimeoutOnBackpressureMillis, TimeUnit.MILLISECONDS)
+                    .build());
+        } else {
+            blacklistedChannels = Optional.empty();
+        }
+
+        if (serverCfg.getCloseChannelOnResponseTimeout()) {
+            onResponseTimeout = (ch) -> {
+                LOG.warn("closing channel {} because it was non-writable for longer than {} ms",
+                        ch, waitTimeoutOnBackpressureMillis);
+                ch.close();
+            };
+        } else {
+            // noop
+            onResponseTimeout = (ch) -> {};
+        }
+
         // Expose Stats
         this.statsEnabled = serverCfg.isStatisticsEnabled();
         this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
@@ -214,6 +266,126 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
         this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO);
         this.getBookieInfoRequestStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO_REQUEST);
         this.channelWriteStats = statsLogger.getOpStatsLogger(CHANNEL_WRITE);
+
+        this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT);
+        this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT);
+
+        int maxAdds = serverCfg.getMaxAddsInProgressLimit();
+        addsSemaphore = maxAdds > 0 ? new Semaphore(maxAdds, true) : null;
+
+        int maxReads = serverCfg.getMaxReadsInProgressLimit();
+        readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null;
+
+        statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return addsInProgress;
+            }
+        });
+
+        statsLogger.registerGauge(ADD_ENTRY_BLOCKED, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return addsBlocked;
+            }
+        });
+
+        statsLogger.registerGauge(READ_ENTRY_IN_PROGRESS, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return readsInProgress;
+            }
+        });
+
+        statsLogger.registerGauge(READ_ENTRY_BLOCKED, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return readsBlocked;
+            }
+        });
+
+    }
+
+    protected void onAddRequestStart(Channel channel) {
+        if (addsSemaphore != null) {
+            if (!addsSemaphore.tryAcquire()) {
+                final long throttlingStartTimeNanos = MathUtils.nowInNano();
+                channel.config().setAutoRead(false);
+                LOG.info("Too many add requests in progress, disabling autoread on channel {}", channel);
+                addsBlocked.incrementAndGet();
+                addsSemaphore.acquireUninterruptibly();
+                channel.config().setAutoRead(true);
+                final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
+                LOG.info("Re-enabled autoread on channel {} after AddRequest delay of {} nanos", channel, delayNanos);
+                addEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
+                addsBlocked.decrementAndGet();
+            }
+        }
+        final int curr = addsInProgress.incrementAndGet();
+        maxAddsInProgress.accumulateAndGet(curr, Integer::max);
+    }
+
+    protected void onAddRequestFinish() {
+        addsInProgress.decrementAndGet();
+        if (addsSemaphore != null) {
+            addsSemaphore.release();
+        }
+    }
+
+    protected void onReadRequestStart(Channel channel) {
+        if (readsSemaphore != null) {
+            if (!readsSemaphore.tryAcquire()) {
+                final long throttlingStartTimeNanos = MathUtils.nowInNano();
+                channel.config().setAutoRead(false);
+                LOG.info("Too many read requests in progress, disabling autoread on channel {}", channel);
+                readsBlocked.incrementAndGet();
+                readsSemaphore.acquireUninterruptibly();
+                channel.config().setAutoRead(true);
+                final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos);
+                LOG.info("Re-enabled autoread on channel {} after ReadRequest delay of {} nanos", channel, delayNanos);
+                readEntryBlockedStats.registerSuccessfulEvent(delayNanos, TimeUnit.NANOSECONDS);
+                readsBlocked.decrementAndGet();
+            }
+        }
+        final int curr = readsInProgress.incrementAndGet();
+        maxReadsInProgress.accumulateAndGet(curr, Integer::max);
+    }
+
+    protected void onReadRequestFinish() {
+        readsInProgress.decrementAndGet();
+        if (readsSemaphore != null) {
+            readsSemaphore.release();
+        }
+    }
+
+    @VisibleForTesting
+    int maxAddsInProgressCount() {
+        return maxAddsInProgress.get();
+    }
+
+    @VisibleForTesting
+    int maxReadsInProgressCount() {
+        return maxReadsInProgress.get();
     }
 
     @Override
@@ -572,4 +744,28 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Channe
             }
         }
     }
+
+    public long getWaitTimeoutOnBackpressureMillis() {
+        return waitTimeoutOnBackpressureMillis;
+    }
+
+    public void blacklistChannel(Channel channel) {
+        blacklistedChannels
+                .ifPresent(x -> x.put(channel, true));
+    }
+
+    public void invalidateBlacklist(Channel channel) {
+        blacklistedChannels
+                .ifPresent(x -> x.invalidate(channel));
+    }
+
+    public boolean isBlacklisted(Channel channel) {
+        return blacklistedChannels
+                .map(x -> x.getIfPresent(channel))
+                .orElse(false);
+    }
+
+    public void handleNonWritableChannel(Channel channel) {
+        onResponseTimeout.accept(channel);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index c0214d338..d386b915c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -29,6 +29,8 @@
 import java.net.UnknownHostException;
 import java.security.AccessControlException;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
 import org.apache.bookkeeper.bookie.BookieException;
@@ -127,6 +129,10 @@ public void start() throws IOException, UnavailableException, InterruptedExcepti
         running = true;
         deathWatcher = new DeathWatcher(conf);
         deathWatcher.start();
+
+        // fixes test flappers at random places until ISSUE#1400 is resolved
+        // https://github.com/apache/bookkeeper/issues/1400
+        TimeUnit.MILLISECONDS.sleep(250);
     }
 
     @VisibleForTesting
@@ -139,6 +145,11 @@ public Bookie getBookie() {
         return bookie;
     }
 
+    @VisibleForTesting
+    public BookieRequestProcessor getBookieRequestProcessor() {
+        return (BookieRequestProcessor) requestProcessor;
+    }
+
     /**
      * Suspend processing of requests in the bookie (for testing).
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index fc55e4a77..d96495748 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -54,7 +54,7 @@ private GetBookieInfoResponse getGetBookieInfoResponse() {
 
         if (!isVersionCompatible()) {
             getBookieInfoResponse.setStatus(StatusCode.EBADVERSION);
-            requestProcessor.getBookieInfoStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+            requestProcessor.getGetBookieInfoStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
             return getBookieInfoResponse.build();
         }
@@ -66,11 +66,11 @@ private GetBookieInfoResponse getGetBookieInfoResponse() {
         long freeDiskSpace = 0L, totalDiskSpace = 0L;
         try {
             if ((requested & GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
-                freeDiskSpace = requestProcessor.bookie.getTotalFreeSpace();
+                freeDiskSpace = requestProcessor.getBookie().getTotalFreeSpace();
                 getBookieInfoResponse.setFreeDiskSpace(freeDiskSpace);
             }
             if ((requested & GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) {
-                totalDiskSpace = requestProcessor.bookie.getTotalDiskSpace();
+                totalDiskSpace = requestProcessor.getBookie().getTotalDiskSpace();
                 getBookieInfoResponse.setTotalDiskCapacity(totalDiskSpace);
             }
             LOG.debug("FreeDiskSpace info is " + freeDiskSpace + " totalDiskSpace is: " + totalDiskSpace);
@@ -80,7 +80,7 @@ private GetBookieInfoResponse getGetBookieInfoResponse() {
         }
 
         getBookieInfoResponse.setStatus(status);
-        requestProcessor.getBookieInfoStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+        requestProcessor.getGetBookieInfoStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                 TimeUnit.NANOSECONDS);
         return getBookieInfoResponse.build();
     }
@@ -98,6 +98,6 @@ private void sendResponse(GetBookieInfoResponse getBookieInfoResponse) {
                 .setGetBookieInfoResponse(getBookieInfoResponse);
         sendResponse(response.getStatus(),
                      response.build(),
-                     requestProcessor.getBookieInfoRequestStats);
+                     requestProcessor.getGetBookieInfoRequestStats());
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 9cd9fd5e1..7dc29a38b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -32,6 +32,7 @@
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.util.StringUtils;
 
 /**
  * A base class for bookkeeper protocol v3 packet processors.
@@ -53,6 +54,40 @@ public PacketProcessorBaseV3(Request request, Channel channel,
 
     protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
         final long writeNanos = MathUtils.nowInNano();
+
+        final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis();
+        if (timeOut >= 0 && !channel.isWritable()) {
+            if (!requestProcessor.isBlacklisted(channel)) {
+                synchronized (channel) {
+                    if (!channel.isWritable() && !requestProcessor.isBlacklisted(channel)) {
+                        final long waitUntilNanos = writeNanos + TimeUnit.MILLISECONDS.toNanos(timeOut);
+                        while (!channel.isWritable() && MathUtils.nowInNano() < waitUntilNanos) {
+                            try {
+                                TimeUnit.MILLISECONDS.sleep(1);
+                            } catch (InterruptedException e) {
+                                break;
+                            }
+                        }
+                        if (!channel.isWritable()) {
+                            requestProcessor.blacklistChannel(channel);
+                            requestProcessor.handleNonWritableChannel(channel);
+                        }
+                    }
+                }
+            }
+
+            if (!channel.isWritable()) {
+                LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel,
+                        StringUtils.requestToString(request));
+                requestProcessor.getChannelWriteStats()
+                        .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS);
+                statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+                return;
+            } else {
+                requestProcessor.invalidateBlacklist(channel);
+            }
+        }
+
         channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
@@ -71,7 +106,6 @@ public void operationComplete(ChannelFuture future) throws Exception {
                 }
             }
         });
-
     }
 
     protected boolean isVersionCompatible() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 4d69acf85..39436d438 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -126,6 +126,7 @@
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -939,7 +940,7 @@ private void writeAndFlush(final Channel channel,
                            final Object request,
                            final boolean allowFastFail) {
         if (channel == null) {
-            LOG.warn("Operation {} failed: channel == null", requestToString(request));
+            LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
             errorOut(key);
             return;
         }
@@ -952,7 +953,7 @@ private void writeAndFlush(final Channel channel,
 
         if (allowFastFail && !isWritable) {
             LOG.warn("Operation {} failed: TooManyRequestsException",
-                    requestToString(request));
+                    StringUtils.requestToString(request));
 
             errorOut(key, BKException.Code.TooManyRequestsException);
             return;
@@ -975,22 +976,11 @@ private void writeAndFlush(final Channel channel,
 
             channel.writeAndFlush(request, promise);
         } catch (Throwable e) {
-            LOG.warn("Operation {} failed", requestToString(request), e);
+            LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
-    private static String requestToString(Object request) {
-        if (request instanceof BookkeeperProtocol.Request) {
-            BookkeeperProtocol.BKPacketHeader header = ((BookkeeperProtocol.Request) request).getHeader();
-            return String.format("Req(txnId=%d,op=%s,version=%s)",
-                                 header.getTxnId(), header.getOperation(),
-                                 header.getVersion());
-        } else {
-            return request.toString();
-        }
-    }
-
     void errorOut(final CompletionKey key) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Removing completion key: {}", key);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 3d2a1eafe..12c02016d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -65,6 +65,8 @@ public ReadEntryProcessorV3(Request request,
                                 BookieRequestProcessor requestProcessor,
                                 ExecutorService fenceThreadPool) {
         super(request, channel, requestProcessor);
+        requestProcessor.onReadRequestStart(channel);
+
         this.readRequest = request.getReadRequest();
         this.ledgerId = readRequest.getLedgerId();
         this.entryId = readRequest.getEntryId();
@@ -313,6 +315,7 @@ protected void sendResponse(ReadResponse readResponse) {
         sendResponse(response.getStatus(),
                      response.build(),
                      reqStats);
+        requestProcessor.onReadRequestFinish();
     }
 
     //
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index fe68da42a..7747e5c0e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -37,6 +37,7 @@
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@
     public WriteEntryProcessorV3(Request request, Channel channel,
                                  BookieRequestProcessor requestProcessor) {
         super(request, channel, requestProcessor);
+        requestProcessor.onAddRequestStart(channel);
     }
 
     // Returns null if there is no exception thrown
@@ -173,6 +175,12 @@ public void safeRun() {
         }
     }
 
+    @Override
+    protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
+        super.sendResponse(code, response, statsLogger);
+        requestProcessor.onAddRequestFinish();
+    }
+
     /**
      * this toString method filters out body and masterKey from the output.
      * masterKey contains the password of the ledger and body is customer data,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
index f55edd155..73bf0187c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 
+import org.apache.bookkeeper.proto.BookkeeperProtocol;
+
 /**
  * Provided utilites for parsing network addresses, ledger-id from node paths
  * etc.
@@ -161,4 +163,20 @@ public static long stringToHierarchicalLedgerId(String...levelNodes) throws IOEx
         }
     }
 
+    /**
+     * Builds string representation of teh request without extra (i.e. binary) data
+     *
+     * @param request
+     * @return string representation of request
+     */
+    public static String requestToString(Object request) {
+        if (request instanceof BookkeeperProtocol.Request) {
+            BookkeeperProtocol.BKPacketHeader header = ((BookkeeperProtocol.Request) request).getHeader();
+            return String.format("Req(txnId=%d,op=%s,version=%s)",
+                    header.getTxnId(), header.getOperation(),
+                    header.getVersion());
+        } else {
+            return request.toString();
+        }
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index f5e73a167..fbfe7c962 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -303,7 +303,8 @@ public void testAddEntryFailureOnDiskFull() throws Exception {
         Bookie bookie = new Bookie(conf);
         EntryLogger entryLogger = new EntryLogger(conf,
                 bookie.getLedgerDirsManager());
-        InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+        InterleavedLedgerStorage ledgerStorage =
+                ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage());
         ledgerStorage.entryLogger = entryLogger;
         // Create ledgers
         ledgerStorage.setMasterKey(1, "key".getBytes());
@@ -675,7 +676,7 @@ public void testConcurrentWriteAndReadCalls(String ledgerStorageClass, boolean e
         conf.setLedgerStorageClass(ledgerStorageClass);
         conf.setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled);
         Bookie bookie = new Bookie(conf);
-        InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+        CompactableLedgerStorage ledgerStorage = (CompactableLedgerStorage) bookie.ledgerStorage;
         Random rand = new Random(0);
 
         if (ledgerStorageClass.equals(SortedLedgerStorage.class.getName())) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index ae3b4cdcc..1a01299a6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -89,7 +89,7 @@ public void setUp() throws Exception {
         bookie = new Bookie(conf);
 
         activeLedgers = new SnapshotMap<Long, Boolean>();
-        ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache;
+        ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage()).ledgerCache;
     }
 
     @After
@@ -116,8 +116,8 @@ private void newLedgerCache() throws IOException {
         if (ledgerCache != null) {
             ledgerCache.close();
         }
-        ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache = new LedgerCacheImpl(
-                conf, activeLedgers, bookie.getIndexDirsManager());
+        ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage())
+                .ledgerCache = new LedgerCacheImpl(conf, activeLedgers, bookie.getIndexDirsManager());
         flushThread = new Thread() {
                 public void run() {
                     while (true) {
@@ -275,7 +275,8 @@ public void testLedgerCacheFlushFailureOnDiskFull() throws Exception {
         conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() });
 
         Bookie bookie = new Bookie(conf);
-        InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+        InterleavedLedgerStorage ledgerStorage =
+                ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage());
         LedgerCacheImpl ledgerCache = (LedgerCacheImpl) ledgerStorage.ledgerCache;
         // Create ledger index file
         ledgerStorage.setMasterKey(1, "key".getBytes());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
new file mode 100644
index 000000000..c3a8f6359
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
@@ -0,0 +1,140 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Strictly for testing.
+ * have to be in org.apache.bookkeeper.bookie to not introduce changes to InterleavedLedgerStorage
+ */
+public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
+
+    public static final String PROP_SLOW_STORAGE_FLUSH_DELAY = "test.slowStorage.flushDelay";
+    public static final String PROP_SLOW_STORAGE_ADD_DELAY = "test.slowStorage.addDelay";
+    public static final String PROP_SLOW_STORAGE_GET_DELAY = "test.slowStorage.getDelay";
+
+    /**
+     * Strictly for testing.
+     */
+    public static class SlowEntryLogger extends EntryLogger {
+        public volatile long getDelay = 0;
+        public volatile long addDelay = 0;
+        public volatile long flushDelay = 0;
+
+        public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener)
+                throws IOException {
+            super(conf, ledgerDirsManager, listener);
+        }
+
+        public SlowEntryLogger setAddDelay(long delay) {
+            addDelay = delay;
+            return this;
+        }
+
+        public SlowEntryLogger setGetDelay(long delay) {
+            getDelay = delay;
+            return this;
+        }
+
+        public SlowEntryLogger setFlushDelay(long delay) {
+            flushDelay = delay;
+            return this;
+        }
+
+        @Override
+        public void flush() throws IOException {
+            delayMs(flushDelay);
+            super.flush();
+        }
+
+        @Override
+        public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+            delayMs(addDelay);
+            return super.addEntry(ledger, entry, rollLog);
+        }
+
+        @Override
+        public ByteBuf readEntry(long ledgerId, long entryId, long location)
+                throws IOException, Bookie.NoEntryException {
+            delayMs(getDelay);
+            return super.readEntry(ledgerId, entryId, location);
+        }
+
+        private static void delayMs(long delay) {
+            if (delay < 1) {
+                return;
+            }
+            try {
+                Thread.sleep(delay);
+            } catch (InterruptedException e) {
+                //noop
+            }
+        }
+
+    }
+
+    public SlowInterleavedLedgerStorage() {
+        super();
+    }
+
+    @Override
+    public void initialize(ServerConfiguration conf,
+                           LedgerManager ledgerManager,
+                           LedgerDirsManager ledgerDirsManager,
+                           LedgerDirsManager indexDirsManager,
+                           StateManager stateManager,
+                           CheckpointSource checkpointSource,
+                           Checkpointer checkpointer,
+                           StatsLogger statsLogger)
+            throws IOException {
+        super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
+                stateManager, checkpointSource, checkpointer, statsLogger);
+        // do not want to add these to config class, reading throw "raw" interface
+        long getDelay = conf.getLong(PROP_SLOW_STORAGE_GET_DELAY, 0);
+        long addDelay = conf.getLong(PROP_SLOW_STORAGE_ADD_DELAY, 0);
+        long flushDelay = conf.getLong(PROP_SLOW_STORAGE_FLUSH_DELAY, 0);
+
+        entryLogger = new SlowEntryLogger(conf, ledgerDirsManager, this)
+                .setAddDelay(addDelay)
+                .setGetDelay(getDelay)
+                .setFlushDelay(flushDelay);
+    }
+
+    public void setAddDelay(long delay) {
+        ((SlowEntryLogger) entryLogger).setAddDelay(delay);
+    }
+
+    public void setGetDelay(long delay) {
+        ((SlowEntryLogger) entryLogger).setGetDelay(delay);
+    }
+
+    public void setFlushDelay(long delay) {
+        ((SlowEntryLogger) entryLogger).setFlushDelay(delay);
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java
new file mode 100644
index 000000000..e12976d04
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowSortedLedgerStorage.java
@@ -0,0 +1,36 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Strictly for unit testing.
+ */
+public class SlowSortedLedgerStorage extends SortedLedgerStorage {
+
+    public SlowSortedLedgerStorage() {
+        this(new SlowInterleavedLedgerStorage());
+    }
+
+    SlowSortedLedgerStorage(InterleavedLedgerStorage ils) {
+        super(ils);
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index f3ef1086c..322cdd089 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -225,7 +225,7 @@ public void testCheckpointAfterEntryLogRotated() throws Exception {
         EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) storage.getEntryLogger()
                 .getEntryLogManager();
         entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
-        long leastUnflushedLogId = storage.entryLogger.getLeastUnflushedLogId();
+        long leastUnflushedLogId = storage.getEntryLogger().getLeastUnflushedLogId();
         long currentLogId = entryLogManager.getCurrentLogId();
         log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
 
@@ -244,8 +244,8 @@ public void testCheckpointAfterEntryLogRotated() throws Exception {
         assertEquals(0, storage.memTable.kvmap.size());
         assertTrue(
             "current log " + currentLogId + " contains entries added from memtable should be forced to disk"
-            + " but least unflushed log is " + storage.entryLogger.getLeastUnflushedLogId(),
-            storage.entryLogger.getLeastUnflushedLogId() > currentLogId);
+            + " but least unflushed log is " + storage.getEntryLogger().getLeastUnflushedLogId(),
+            storage.getEntryLogger().getLeastUnflushedLogId() > currentLogId);
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
new file mode 100644
index 000000000..5454a702d
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
@@ -0,0 +1,469 @@
+package org.apache.bookkeeper.proto;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.nio.channels.FileChannel;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Journal;
+import org.apache.bookkeeper.bookie.SlowBufferedChannel;
+import org.apache.bookkeeper.bookie.SlowInterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.SlowSortedLedgerStorage;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Tests for backpressure handling on the server side.
+ */
+// PowerMock usage is problematic here due to https://github.com/powermock/powermock/issues/822
+public class BookieBackpressureTest extends BookKeeperClusterTestCase
+        implements AddCallback, ReadCallback, ReadLastConfirmedCallback {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BookieBackpressureTest.class);
+
+    byte[] ledgerPassword = "aaa".getBytes();
+
+    final byte[] data = new byte[8 * 1024];
+
+    // test related constants
+    static final int NUM_ENTRIES_TO_WRITE = 200;
+    static final int ENTRIES_IN_MEMTABLE = 2;
+    static final int MAX_PENDING = 2 * ENTRIES_IN_MEMTABLE + 1;
+    static final int NUM_OF_LEDGERS = 2 * MAX_PENDING;
+
+    DigestType digestType;
+
+    long getDelay;
+    long addDelay;
+    long flushDelay;
+
+    public BookieBackpressureTest() {
+        super(1);
+        this.digestType = DigestType.CRC32;
+
+        baseClientConf.setAddEntryTimeout(100);
+        baseClientConf.setAddEntryQuorumTimeout(100);
+        baseClientConf.setReadEntryTimeout(100);
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        getDelay = 0;
+        addDelay = 0;
+        flushDelay = 0;
+    }
+
+    class SyncObj {
+        long lastConfirmed;
+        volatile int counter;
+        boolean value;
+        AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+        Enumeration<LedgerEntry> ls = null;
+
+        public SyncObj() {
+            counter = 0;
+            lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
+            value = false;
+        }
+
+        void setReturnCode(int rc) {
+            this.rc.compareAndSet(BKException.Code.OK, rc);
+        }
+
+        void setLedgerEntries(Enumeration<LedgerEntry> ls) {
+            this.ls = ls;
+        }
+    }
+
+    private void mockJournal(Bookie bookie, long getDelay, long addDelay, long flushDelay) throws Exception {
+        if (getDelay <= 0 && addDelay <= 0 && flushDelay <= 0) {
+            return;
+        }
+
+        List<Journal> journals = getJournals(bookie);
+        for (int i = 0; i < journals.size(); i++) {
+            Journal mock = spy(journals.get(i));
+            when(mock.getBufferedChannelBuilder()).thenReturn((FileChannel fc, int capacity) ->  {
+                SlowBufferedChannel sbc = new SlowBufferedChannel(fc, capacity);
+                sbc.setAddDelay(addDelay);
+                sbc.setGetDelay(getDelay);
+                sbc.setFlushDelay(flushDelay);
+                return sbc;
+            });
+
+            journals.set(i, mock);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<Journal> getJournals(Bookie bookie) throws NoSuchFieldException, IllegalAccessException {
+        Field f = bookie.getClass().getDeclaredField("journals");
+        f.setAccessible(true);
+
+        return (List<Journal>) f.get(bookie);
+    }
+
+    @Test
+    public void testWriteNoBackpressureSlowJournal() throws Exception {
+        //disable backpressure for writes
+        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        addDelay = 1;
+
+        doWritesNoBackpressure(0);
+    }
+
+    @Test
+    public void testWriteNoBackpressureSlowJournalFlush() throws Exception {
+        //disable backpressure for writes
+        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        // to increase frequency of flushes
+        bsConfs.get(0).setJournalAdaptiveGroupWrites(false);
+        flushDelay = 1;
+
+        doWritesNoBackpressure(0);
+    }
+
+    @Test
+    public void testWriteWithBackpressureSlowJournal() throws Exception {
+        //enable backpressure with MAX_PENDING writes in progress
+        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        flushDelay = 1;
+
+        doWritesWithBackpressure(0);
+    }
+
+
+    @Test
+    public void testWriteWithBackpressureSlowJournalFlush() throws Exception {
+        //enable backpressure with MAX_PENDING writes in progress
+        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        // to increase frequency of flushes
+        bsConfs.get(0).setJournalAdaptiveGroupWrites(false);
+        flushDelay = 1;
+
+        doWritesWithBackpressure(0);
+    }
+
+    @Test
+    public void testWriteNoBackpressureSlowInterleavedStorage() throws Exception {
+        //disable backpressure for writes
+        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+
+        doWritesNoBackpressure(0);
+    }
+
+    @Test
+    public void testWriteWithBackpressureSlowInterleavedStorage() throws Exception {
+        //enable backpressure with MAX_PENDING writes in progress
+        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+
+        doWritesWithBackpressure(0);
+    }
+
+    @Test
+    public void testWriteNoBackpressureSlowInterleavedStorageFlush() throws Exception {
+        //disable backpressure for writes
+        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+        doWritesNoBackpressure(0);
+    }
+
+    @Test
+    public void testWriteWithBackpressureSlowInterleavedStorageFlush() throws Exception {
+        //enable backpressure with MAX_PENDING writes in progress
+        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+        doWritesWithBackpressure(0);
+    }
+
+    @Test
+    public void testWriteNoBackpressureSortedStorage() throws Exception {
+        //disable backpressure for writes
+        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        // one for memtable being flushed, one for the part accepting the data
+        assertTrue("for the test, memtable should not keep more entries than allowed",
+                ENTRIES_IN_MEMTABLE * 2 <= MAX_PENDING);
+        bsConfs.get(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1);
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+        doWritesNoBackpressure(0);
+    }
+
+    @Test
+    public void testWriteWithBackpressureSortedStorage() throws Exception {
+        //enable backpressure with MAX_PENDING writes in progress
+        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        // one for memtable being flushed, one for the part accepting the data
+        assertTrue("for the test, memtable should not keep more entries than allowed",
+                ENTRIES_IN_MEMTABLE * 2 <= MAX_PENDING);
+        bsConfs.get(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1);
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+
+        doWritesWithBackpressure(0);
+    }
+
+    @Test
+    public void testReadsNoBackpressure() throws Exception {
+        //disable backpressure for reads
+        bsConfs.get(0).setMaxReadsInProgressLimit(0);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
+
+        final BookieRequestProcessor brp = generateDataAndDoReads(0);
+
+        Assert.assertThat("reads in progress should exceed MAX_PENDING",
+                brp.maxReadsInProgressCount(), Matchers.greaterThan(MAX_PENDING));
+    }
+
+   @Test
+    public void testReadsWithBackpressure() throws Exception {
+        //enable backpressure for reads
+        bsConfs.get(0).setMaxReadsInProgressLimit(MAX_PENDING);
+        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        bsConfs.get(0).setWriteBufferBytes(data.length);
+
+        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
+
+        final BookieRequestProcessor brp = generateDataAndDoReads(0);
+
+        Assert.assertThat("reads in progress should NOT exceed MAX_PENDING ",
+                brp.maxReadsInProgressCount(), Matchers.lessThanOrEqualTo(MAX_PENDING));
+    }
+
+    private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exception {
+        BookieServer bks = bs.get(bkId);
+        bks.shutdown();
+        bks = new BookieServer(bsConfs.get(bkId));
+        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        bks.start();
+        bs.set(bkId, bks);
+
+        LOG.info("creating ledgers");
+        // Create ledgers
+        final int numEntriesForReads = 10;
+        LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
+        for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+            lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword);
+            LOG.info("created ledger ID: {}", lhs[i].getId());
+        }
+
+        LOG.info("generating data for reads");
+        final CountDownLatch writesCompleteLatch = new CountDownLatch(numEntriesForReads * NUM_OF_LEDGERS);
+        for (int i = 0; i < numEntriesForReads; i++) {
+            for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+                lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> writesCompleteLatch.countDown(), null);
+            }
+        }
+        writesCompleteLatch.await();
+
+        LOG.info("issue bunch of async reads");
+        final CountDownLatch readsCompleteLatch = new CountDownLatch(numEntriesForReads * NUM_OF_LEDGERS);
+        for (int i = 0; i < numEntriesForReads; i++) {
+            for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+                lhs[ledger].asyncReadEntries(i, i, (rc, lh, seq, ctx) -> readsCompleteLatch.countDown(), null);
+            }
+        }
+        readsCompleteLatch.await();
+        LOG.info("reads finished");
+
+        return bks.getBookieRequestProcessor();
+    }
+
+    // here we expect that backpressure is disabled and number of writes in progress
+    // will exceed the limit
+    private void doWritesNoBackpressure(final int bkId) throws Exception {
+        BookieServer bks = bs.get(bkId);
+        bks.shutdown();
+        bks = new BookieServer(bsConfs.get(bkId));
+        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        bks.start();
+        bs.set(bkId, bks);
+
+        LOG.info("Creating ledgers");
+        LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
+        for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+            lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword);
+            LOG.info("created ledger ID: {}", lhs[i].getId());
+        }
+
+        final CountDownLatch completeLatch = new CountDownLatch(NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS);
+
+        LOG.info("submitting writes");
+        for (int i = 0; i < NUM_ENTRIES_TO_WRITE; i++) {
+            for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+                lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> completeLatch.countDown(), null);
+            }
+        }
+
+        boolean exceededLimit = false;
+        BookieRequestProcessor brp = bks.getBookieRequestProcessor();
+        while (!completeLatch.await(1, TimeUnit.MILLISECONDS)) {
+            int val = brp.maxAddsInProgressCount();
+            if (val > MAX_PENDING) {
+                exceededLimit = true;
+                break;
+            }
+            LOG.info("Waiting until all writes succeeded or maxAddsInProgressCount {} > MAX_PENDING {}",
+                    val, MAX_PENDING);
+        }
+
+        assertTrue("expected to exceed number of pending writes", exceededLimit);
+
+        for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+            lhs[i].close();
+        }
+    }
+
+    // here we expect that backpressure is enabled and number of writes in progress
+    // will never exceed the limit
+    private void doWritesWithBackpressure(final int bkId) throws Exception {
+        BookieServer bks = bs.get(bkId);
+        bks.shutdown();
+        bks = new BookieServer(bsConfs.get(bkId));
+        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        bks.start();
+        bs.set(bkId, bks);
+
+        LOG.info("Creating ledgers");
+        LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
+        for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+            lhs[i] = bkc.createLedger(1, 1, digestType, ledgerPassword);
+            LOG.info("created ledger ID: {}", lhs[i].getId());
+        }
+
+        final CountDownLatch completeLatch = new CountDownLatch(NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS);
+        final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+
+        LOG.info("submitting writes");
+        for (int i = 0; i < NUM_ENTRIES_TO_WRITE; i++) {
+            for (int ledger = 0; ledger < NUM_OF_LEDGERS; ledger++) {
+                lhs[ledger].asyncAddEntry(data, (rc2, lh, entryId, ctx) -> {
+                    rc.compareAndSet(BKException.Code.OK, rc2);
+                    completeLatch.countDown();
+                }, null);
+            }
+        }
+
+        LOG.info("test submitted all writes");
+        BookieRequestProcessor brp = bks.getBookieRequestProcessor();
+        while (!completeLatch.await(1, TimeUnit.MILLISECONDS)) {
+            int val = brp.maxAddsInProgressCount();
+            assertTrue("writes in progress should not exceed limit, got " + val, val <= MAX_PENDING);
+            LOG.info("Waiting for all writes to succeed, left {} of {}",
+                    completeLatch.getCount(), NUM_ENTRIES_TO_WRITE * NUM_OF_LEDGERS);
+        }
+
+        if (rc.get() != BKException.Code.OK) {
+            throw BKException.create(rc.get());
+        }
+
+        for (int i = 0; i < NUM_OF_LEDGERS; i++) {
+            lhs[i].close();
+        }
+    }
+
+
+    @Override
+    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+        SyncObj sync = (SyncObj) ctx;
+        sync.setReturnCode(rc);
+        synchronized (sync) {
+            sync.counter++;
+            sync.notify();
+        }
+    }
+
+    @Override
+    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+        SyncObj sync = (SyncObj) ctx;
+        sync.setLedgerEntries(seq);
+        sync.setReturnCode(rc);
+        synchronized (sync) {
+            sync.value = true;
+            sync.notify();
+        }
+    }
+
+    @Override
+    public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
+        SyncObj sync = (SyncObj) ctx;
+        sync.setReturnCode(rc);
+        synchronized (sync) {
+            sync.lastConfirmed = lastConfirmed;
+            sync.notify();
+        }
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index 90a51c89e..37d464734 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -74,6 +74,7 @@ public void setup() {
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
         when(requestProcessor.getForceLedgerStats())
             .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger"));
         when(requestProcessor.getForceLedgerRequestStats())
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index 354fcaf4b..4ebe01cfc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -47,6 +47,8 @@
  */
 public class TestBookieRequestProcessor {
 
+    final BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
+
     @Test
     public void testConstructLongPollThreads() throws Exception {
         // long poll threads == read threads
@@ -134,7 +136,7 @@ public void testToString() {
                 .setBody(ByteString.copyFrom("entrydata".getBytes())).build();
         Request request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build();
 
-        WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, null);
+        WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor);
         String toString = writeEntryProcessorV3.toString();
         assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body"));
         assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey",
@@ -152,7 +154,7 @@ public void testToString() {
                 .setBody(ByteString.copyFrom("entrydata".getBytes())).setFlag(Flag.RECOVERY_ADD).setWriteFlags(0)
                 .build();
         request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build();
-        writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, null);
+        writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor);
         toString = writeEntryProcessorV3.toString();
         assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body"));
         assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey",
@@ -188,7 +190,7 @@ public void testToString() {
                 .setMasterKey(ByteString.copyFrom("masterKey".getBytes()))
                 .setBody(ByteString.copyFrom("entrydata".getBytes())).build();
         request = Request.newBuilder().setHeader(header).setWriteLacRequest(writeLacRequest).build();
-        WriteLacProcessorV3 writeLacProcessorV3 = new WriteLacProcessorV3(request, null, null);
+        WriteLacProcessorV3 writeLacProcessorV3 = new WriteLacProcessorV3(request, null, requestProcessor);
         toString = writeLacProcessorV3.toString();
         assertFalse("writeLacProcessorV3's toString should have filtered out body", toString.contains("body"));
         assertFalse("writeLacProcessorV3's toString should have filtered out masterKey",
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 8f54ddbc4..df7b1532b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -79,10 +79,13 @@ public void setup() {
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
         when(requestProcessor.getAddEntryStats())
             .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
         when(requestProcessor.getAddRequestStats())
             .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+        when(requestProcessor.getChannelWriteStats())
+                .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("CHANNEL_WRITE"));
         processor = new WriteEntryProcessorV3(
             request,
             channel,
@@ -242,4 +245,41 @@ public void testNormalWritesOnWritableBookie() throws Exception {
         assertEquals(StatusCode.EOK, response.getStatus());
     }
 
+    @Test
+    public void testWritesWithClientNotAcceptingReponses() throws Exception {
+        when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(5L);
+
+        doAnswer(invocationOnMock -> {
+            Channel ch = invocationOnMock.getArgument(0);
+            ch.close();
+            return null;
+        }).when(requestProcessor).handleNonWritableChannel(any());
+
+        when(channel.isWritable()).thenReturn(false);
+
+        when(bookie.isReadOnly()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            WriteCallback wc = invocationOnMock.getArgument(2);
+
+            wc.writeComplete(
+                    0,
+                    request.getAddRequest().getLedgerId(),
+                    request.getAddRequest().getEntryId(),
+                    null,
+                    null);
+            return null;
+        }).when(bookie).addEntry(
+                any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+
+        processor.run();
+
+        verify(bookie, times(1))
+                .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+        verify(requestProcessor, times(1)).handleNonWritableChannel(channel);
+        verify(channel, times(0)).writeAndFlush(any(Response.class));
+        verify(channel, times(1)).close();
+    }
+
 }
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 7bed0b743..83a39f46c 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -61,6 +61,12 @@
     <Method name="getBuffer" />
     <Bug pattern="EI_EXPOSE_REP" />
   </Match>
+  <Match>
+    <!-- https://github.com/spotbugs/spotbugs/issues/259 -->
+    <Class name="org.apache.bookkeeper.bookie.InterleavedLedgerStorage" />
+    <Method name="getLastAddConfirmed" />
+    <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
+  </Match>
   <Match>
     <Class name="org.apache.bookkeeper.auth.AuthToken" />
     <Method name="getData" />


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services