You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/04/27 03:12:49 UTC

[GitHub] dlg99 closed pull request #1088: ISSUE #1086 (@bug W-4146427@) Client-side backpressure in netty (Fixes: io.netty.util.internal.OutOfDirectMemoryError under continuous heavy load)

dlg99 closed pull request #1088: ISSUE #1086 (@bug W-4146427@) Client-side backpressure in netty (Fixes: io.netty.util.internal.OutOfDirectMemoryError under continuous heavy load)
URL: https://github.com/apache/bookkeeper/pull/1088
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 7d0fd5d4c..83e642186 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -71,6 +71,7 @@
     String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
 
     String NETTY_EXCEPTION_CNT = "NETTY_EXCEPTION_CNT";
+    String CLIENT_CHANNEL_WRITE_WAIT = "CLIENT_CHANNEL_WRITE_WAIT";
     String CLIENT_CONNECT_TIMER = "CLIENT_CONNECT_TIMER";
     String ADD_OP_OUTSTANDING = "ADD_OP_OUTSTANDING";
     String READ_OP_OUTSTANDING = "READ_OP_OUTSTANDING";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 2d3446080..7dac5f0c3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -72,6 +72,7 @@
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -83,6 +84,7 @@
 import org.apache.bookkeeper.proto.checksum.MacDigestManager;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.commons.collections4.IteratorUtils;
@@ -114,6 +116,8 @@
     final EnumSet<WriteFlag> writeFlags;
     ScheduledFuture<?> timeoutFuture = null;
 
+    final long waitForWriteSetMs;
+
     /**
      * Invalid entry id. This value is returned from methods which
      * should return an entry id but there is no valid entry available.
@@ -135,6 +139,7 @@
     final Counter ensembleChangeCounter;
     final Counter lacUpdateHitsCounter;
     final Counter lacUpdateMissesCounter;
+    private final OpStatsLogger clientChannelWriteWaitStats;
 
     // This empty master key is used when an empty password is provided which is the hash of an empty string
     private static final byte[] emptyLedgerKey;
@@ -155,6 +160,7 @@
         this.enableParallelRecoveryRead = bk.getConf().getEnableParallelRecoveryRead();
         this.recoveryReadBatchSize = bk.getConf().getRecoveryReadBatchSize();
         this.writeFlags = writeFlags;
+        this.waitForWriteSetMs = bk.getConf().getWaitTimeoutOnBackpressureMillis();
 
         if (metadata.isClosed()) {
             lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
@@ -205,6 +211,8 @@ public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
         ensembleChangeCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
         lacUpdateHitsCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
         lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES);
+        clientChannelWriteWaitStats = bk.getStatsLogger()
+                .getOpStatsLogger(BookKeeperClientStats.CLIENT_CHANNEL_WRITE_WAIT);
         bk.getStatsLogger().registerGauge(BookKeeperClientStats.PENDING_ADDS,
                                           new Gauge<Integer>() {
                                               @Override
@@ -841,6 +849,27 @@ public LedgerEntry readLastEntry()
                                                               boolean isRecoveryRead) {
         PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), firstEntry, lastEntry, isRecoveryRead);
         if (!bk.isClosed()) {
+            // Waiting on the first one.
+            // This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive
+            // state later after N requests sent.
+            // Unfortunately it seems that alternatives are:
+            // - send reads one-by-one (up to the app)
+            // - rework LedgerHandle to send requests one-by-one (maybe later, potential perf impact)
+            // - block worker pool (not good)
+            // Even with this implementation one should be more concerned about OOME when all read responses arrive
+            // or about overloading bookies with these requests then about submission of many small requests.
+            // Naturally one of the solutions would be to submit smaller batches and in this case
+            // current implementation will prevent next batch from starting when bookie is
+            // unresponsive thus helpful enough.
+            DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(firstEntry);
+            try {
+                if (!waitForWritable(ws, firstEntry, ws.size() - 1, waitForWriteSetMs)) {
+                    op.allowFailFastOnUnwritableChannel();
+                }
+            } finally {
+                ws.recycle();
+            }
+
             bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } else {
             op.future().completeExceptionally(BKException.create(ClientClosedException));
@@ -1065,6 +1094,86 @@ void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length
         doAsyncAddEntry(op);
     }
 
+    private boolean isWritesetWritable(DistributionSchedule.WriteSet writeSet,
+                                       long key, int allowedNonWritableCount) {
+        if (allowedNonWritableCount < 0) {
+            allowedNonWritableCount = 0;
+        }
+
+        final int sz = writeSet.size();
+        final int requiredWritable = sz - allowedNonWritableCount;
+
+        int nonWritableCount = 0;
+        for (int i = 0; i < sz; i++) {
+            if (!bk.getBookieClient().isWritable(metadata.currentEnsemble.get(i), key)) {
+                nonWritableCount++;
+                if (nonWritableCount >= allowedNonWritableCount) {
+                    return false;
+                }
+            } else {
+                final int knownWritable = i - nonWritableCount;
+                if (knownWritable >= requiredWritable) {
+                    return true;
+                }
+            }
+        }
+        return true;
+    }
+
+    protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, long key,
+                                    int allowedNonWritableCount, long durationMs) {
+        if (durationMs < 0) {
+            return true;
+        }
+
+        final long startTime = MathUtils.nowInNano();
+        boolean success = isWritesetWritable(writeSet, key, allowedNonWritableCount);
+
+        if (!success && durationMs > 0) {
+            int backoff = 1;
+            final int maxBackoff = 4;
+            final long deadline = startTime + TimeUnit.MILLISECONDS.toNanos(durationMs);
+
+            while (!isWritesetWritable(writeSet, key, allowedNonWritableCount)) {
+                if (MathUtils.nowInNano() < deadline) {
+                    long maxSleep = MathUtils.elapsedMSec(startTime);
+                    if (maxSleep < 0) {
+                        maxSleep = 1;
+                    }
+                    long sleepMs = Math.min(backoff, maxSleep);
+
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(sleepMs);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        success = isWritesetWritable(writeSet, key, allowedNonWritableCount);
+                        break;
+                    }
+                    if (backoff <= maxBackoff) {
+                        backoff++;
+                    }
+                } else {
+                    success = false;
+                    break;
+                }
+            }
+            if (backoff > 1) {
+                LOG.info("Spent {} ms waiting for {} writable channels",
+                        MathUtils.elapsedMSec(startTime),
+                        writeSet.size() - allowedNonWritableCount);
+            }
+        }
+
+        if (success) {
+            clientChannelWriteWaitStats.registerSuccessfulEvent(
+                    MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+        } else {
+            clientChannelWriteWaitStats.registerFailedEvent(
+                    MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+        }
+        return success;
+    }
+
     protected void doAsyncAddEntry(final PendingAddOp op) {
         if (throttler != null) {
             throttler.acquire();
@@ -1109,6 +1218,15 @@ public String toString() {
             return;
         }
 
+        DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
+        try {
+            if (!waitForWritable(ws, op.getEntryId(), 0, waitForWriteSetMs)) {
+                op.allowFailFastOnUnwritableChannel();
+            }
+        } finally {
+            ws.recycle();
+        }
+
         try {
             bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
@@ -1154,6 +1272,7 @@ public void asyncReadLastConfirmed(final ReadLastConfirmedCallback cb, final Obj
             cb.readLastConfirmedComplete(BKException.Code.OK, lastEntryId, ctx);
             return;
         }
+
         ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback() {
                 @Override
                 public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
@@ -1165,6 +1284,7 @@ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData dat
                     }
                 }
             };
+
         new ReadLastConfirmedOp(this, innercb).initiate();
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 0eaf0b5d3..f67e1d674 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -237,6 +237,11 @@ public String toString() {
             return;
         }
 
+        if (!waitForWritable(distributionSchedule.getWriteSet(op.getEntryId()),
+                op.getEntryId(), 0, waitForWriteSetMs)) {
+            op.allowFailFastOnUnwritableChannel();
+        }
+
         try {
             bk.getMainWorkerPool().executeOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index eb9576633..61037f0d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -80,6 +80,8 @@
     boolean callbackTriggered;
     boolean hasRun;
 
+    boolean allowFailFast = false;
+
     static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallbackWithLatency cb, Object ctx) {
         PendingAddOp op = RECYCLER.get();
         op.lh = lh;
@@ -100,6 +102,7 @@ static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallbackWithLate
         op.callbackTriggered = false;
         op.hasRun = false;
         op.requestTimeNanos = Long.MAX_VALUE;
+        op.allowFailFast = false;
         op.qwcLatency = 0;
         return op;
     }
@@ -113,6 +116,11 @@ PendingAddOp enableRecoveryAdd() {
         return this;
     }
 
+    PendingAddOp allowFailFastOnUnwritableChannel() {
+        allowFailFast = true;
+        return this;
+    }
+
     void setEntryId(long entryId) {
         this.entryId = entryId;
     }
@@ -129,7 +137,7 @@ void sendWriteRequest(int bookieIndex) {
         int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
 
         lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey,
-                entryId, toSend, this, bookieIndex, flags);
+                entryId, toSend, this, bookieIndex, flags, allowFailFast);
         ++pendingWriteRequests;
     }
 
@@ -444,6 +452,7 @@ private void recyclePendAddOpObject() {
         pendingWriteRequests = 0;
         callbackTriggered = false;
         hasRun = false;
+        allowFailFast = false;
 
         recyclerHandle.recycle(this);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index a41207d53..385207881 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -81,6 +81,7 @@
 
     boolean parallelRead = false;
     final AtomicBoolean complete = new AtomicBoolean(false);
+    boolean allowFailFast = false;
 
     abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, AutoCloseable {
 
@@ -391,6 +392,8 @@ synchronized BookieSocketAddress sendNextRead() {
                 return null;
             }
 
+            // ToDo: pick replica with writable PCBC. ISSUE #1239
+            // https://github.com/apache/bookkeeper/issues/1239
             int replica = nextReplicaIndexToReadFrom;
             int bookieIndex = writeSet.get(nextReplicaIndexToReadFrom);
             nextReplicaIndexToReadFrom++;
@@ -473,6 +476,7 @@ boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer) {
         this.endEntryId = endEntryId;
         this.scheduler = scheduler;
         this.isRecoveryRead = isRecoveryRead;
+        this.allowFailFast = false;
         numPendingEntries = endEntryId - startEntryId + 1;
         requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize()
                 - getLedgerMetadata().getAckQuorumSize() + 1;
@@ -504,6 +508,10 @@ PendingReadOp parallelRead(boolean enabled) {
         return this;
     }
 
+    void allowFailFastOnUnwritableChannel() {
+        allowFailFast = true;
+    }
+
     public void submit() {
         lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, this);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 64fbbe6fa..7b64d1b39 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -128,6 +128,7 @@
     protected static final String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";
     protected static final String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs";
     protected static final String TIMEOUT_TIMER_NUM_TICKS = "timeoutTimerNumTicks";
+    protected static final String WAIT_TIMEOUT_ON_BACKPRESSURE = "waitTimeoutOnBackpressureMs";
 
     // Bookie health check settings
     protected static final String BOOKIE_HEALTH_CHECK_ENABLED = "bookieHealthCheckEnabled";
@@ -418,7 +419,7 @@ public ClientConfiguration setClientReceiveBufferSize(int bufferSize) {
      * @return netty channel write buffer low water mark.
      */
     public int getClientWriteBufferLowWaterMark() {
-        return getInt(CLIENT_WRITEBUFFER_LOW_WATER_MARK, 32 * 1024);
+        return getInt(CLIENT_WRITEBUFFER_LOW_WATER_MARK, 384 * 1024);
     }
 
     /**
@@ -439,7 +440,7 @@ public ClientConfiguration setClientWriteBufferLowWaterMark(int waterMark) {
      * @return netty channel write buffer high water mark.
      */
     public int getClientWriteBufferHighWaterMark() {
-        return getInt(CLIENT_WRITEBUFFER_HIGH_WATER_MARK, 64 * 1024);
+        return getInt(CLIENT_WRITEBUFFER_HIGH_WATER_MARK, 512 * 1024);
     }
 
     /**
@@ -799,6 +800,34 @@ public ClientConfiguration setPCBCTimeoutTimerNumTicks(int numTicks) {
         return this;
     }
 
+    /**
+     * Timeout controlling wait on request send in case of unresponsive bookie(s)
+     * (i.e. bookie in long GC etc.)
+     *
+     * @return timeout value
+     *        negative value disables the feature
+     *        0 to allow request to fail immediately
+     *        Default is -1 (disabled)
+     */
+    public long getWaitTimeoutOnBackpressureMillis() {
+        return getLong(WAIT_TIMEOUT_ON_BACKPRESSURE, -1);
+    }
+
+    /**
+     * Timeout controlling wait on request send in case of unresponsive bookie(s)
+     * (i.e. bookie in long GC etc.)
+     *
+     * @param value
+     *        negative value disables the feature
+     *        0 to allow request to fail immediately
+     *        Default is -1 (disabled)
+     * @return client configuration.
+     */
+    public ClientConfiguration setWaitTimeoutOnBackpressureMillis(long value) {
+        setProperty(WAIT_TIMEOUT_ON_BACKPRESSURE, value);
+        return this;
+    }
+
     /**
      * Get the number of worker threads. This is the number of
      * worker threads used by bookkeeper client to submit operations.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index c160b204a..31c12fcbd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -149,6 +149,12 @@ private int getRc(int rc) {
         return faultyBookies;
     }
 
+    public boolean isWritable(BookieSocketAddress address, long key) {
+        final PerChannelBookieClientPool pcbcPool = lookupClient(address);
+        // if null, let the write initiate connect of fail with whatever error it produces
+        return pcbcPool == null || pcbcPool.isWritable(key);
+    }
+
     @Override
     public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
             SecurityHandlerFactory shFactory) throws SecurityException {
@@ -242,6 +248,18 @@ public void addEntry(final BookieSocketAddress addr,
                          final WriteCallback cb,
                          final Object ctx,
                          final int options) {
+        addEntry(addr, ledgerId, masterKey, entryId, toSend, cb, ctx, options, false);
+    }
+
+    public void addEntry(final BookieSocketAddress addr,
+                         final long ledgerId,
+                         final byte[] masterKey,
+                         final long entryId,
+                         final ByteBufList toSend,
+                         final WriteCallback cb,
+                         final Object ctx,
+                         final int options,
+                         final boolean allowFastFail) {
         final PerChannelBookieClientPool client = lookupClient(addr);
         if (client == null) {
             completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
@@ -255,7 +273,7 @@ public void addEntry(final BookieSocketAddress addr,
 
         client.obtain(ChannelReadyForAddEntryCallback.create(
                               this, toSend, ledgerId, entryId, addr,
-                              ctx, cb, options, masterKey),
+                                  ctx, cb, options, masterKey, allowFastFail),
                       ledgerId);
     }
 
@@ -291,11 +309,12 @@ public void safeRun() {
         private WriteCallback cb;
         private int options;
         private byte[] masterKey;
+        private boolean allowFastFail;
 
         static ChannelReadyForAddEntryCallback create(
                 BookieClient bookieClient, ByteBufList toSend, long ledgerId,
                 long entryId, BookieSocketAddress addr, Object ctx,
-                WriteCallback cb, int options, byte[] masterKey) {
+                WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail) {
             ChannelReadyForAddEntryCallback callback = RECYCLER.get();
             callback.bookieClient = bookieClient;
             callback.toSend = toSend;
@@ -306,6 +325,7 @@ static ChannelReadyForAddEntryCallback create(
             callback.cb = cb;
             callback.options = options;
             callback.masterKey = masterKey;
+            callback.allowFastFail = allowFastFail;
             return callback;
         }
 
@@ -316,7 +336,7 @@ public void operationComplete(final int rc,
                 bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
             } else {
                 pcbc.addEntry(ledgerId, masterKey, entryId,
-                              toSend, cb, ctx, options);
+                              toSend, cb, ctx, options, allowFastFail);
             }
 
             toSend.release();
@@ -346,6 +366,7 @@ public void recycle() {
             cb = null;
             options = -1;
             masterKey = null;
+            allowFastFail = false;
 
             recyclerHandle.recycle(this);
         }
@@ -382,6 +403,12 @@ public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
 
     public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
                           final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) {
+        readEntry(addr, ledgerId, entryId, cb, ctx, flags, masterKey, false);
+    }
+
+    public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
+                          final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey,
+                          final boolean allowFastFail) {
         final PerChannelBookieClientPool client = lookupClient(addr);
         if (client == null) {
             cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
@@ -393,7 +420,7 @@ public void readEntry(final BookieSocketAddress addr, final long ledgerId, final
             if (rc != BKException.Code.OK) {
                 completeRead(rc, ledgerId, entryId, null, cb, ctx);
             } else {
-                pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey);
+                pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey, allowFastFail);
             }
         }, ledgerId);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index e57b8a746..04471d5f5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -84,14 +84,22 @@ public void intialize() {
         }
     }
 
-    @Override
-    public void obtain(GenericCallback<PerChannelBookieClient> callback, long key) {
+    private PerChannelBookieClient getClient(long key) {
         if (1 == clients.length) {
-            clients[0].connectIfNeededAndDoOp(callback);
-            return;
+            return clients[0];
         }
         int idx = MathUtils.signSafeMod(key, clients.length);
-        clients[idx].connectIfNeededAndDoOp(callback);
+        return clients[idx];
+    }
+
+    @Override
+    public void obtain(GenericCallback<PerChannelBookieClient> callback, long key) {
+        getClient(key).connectIfNeededAndDoOp(callback);
+    }
+
+    @Override
+    public boolean isWritable(long key) {
+        return getClient(key).isWritable();
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index b125d9721..3ca208940 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -213,6 +213,7 @@
     private final ClientAuthProvider.Factory authProviderFactory;
     private final ExtensionRegistry extRegistry;
     private final SecurityHandlerFactory shFactory;
+    private volatile boolean isWritable = true;
 
     public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
                                   BookieSocketAddress addr) throws SecurityException {
@@ -336,7 +337,7 @@ public SocketAddress getRemoteAddr() {
             public void disconnect() {
                 Channel c = channel;
                 if (c != null) {
-                    c.close();
+                    c.close().addListener(x -> makeWritable());
                 }
                 LOG.info("authplugin disconnected channel {}", channel);
             }
@@ -456,7 +457,7 @@ protected void initChannel(Channel ch) throws Exception {
 
         ChannelFuture future = bootstrap.connect(bookieAddr);
         future.addListener(new ConnectionFutureListener(startTime));
-
+        future.addListener(x -> makeWritable());
         return future;
     }
 
@@ -465,6 +466,22 @@ void cleanDisconnectAndClose() {
         close();
     }
 
+    /**
+     *
+     * @return boolean, true is PCBC is writable
+     */
+    public boolean isWritable() {
+        return isWritable;
+    }
+
+    public void setWritable(boolean val) {
+        isWritable = val;
+    }
+
+    private void makeWritable() {
+        setWritable(true);
+    }
+
     void connectIfNeededAndDoOp(GenericCallback<PerChannelBookieClient> op) {
         boolean completeOpNow = false;
         int opRc = BKException.Code.OK;
@@ -558,6 +575,11 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
                   Object ctx, final int options) {
+        addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options, false);
+    }
+
+    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
+                  Object ctx, final int options, boolean allowFastFail) {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
@@ -607,7 +629,8 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
             toSend.release();
             return;
         } else {
-            writeAndFlush(c, completionKey, request);
+            // addEntry times out on backpressure
+            writeAndFlush(c, completionKey, request, allowFastFail);
         }
     }
 
@@ -651,7 +674,7 @@ public void readEntryWaitForLACUpdate(final long ledgerId,
                                           ReadEntryCallback cb,
                                           Object ctx) {
         readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis,
-                          piggyBackEntry, cb, ctx, (short) 0, null);
+                          piggyBackEntry, cb, ctx, (short) 0, null, false);
     }
 
     /**
@@ -662,9 +685,10 @@ public void readEntry(final long ledgerId,
                           ReadEntryCallback cb,
                           Object ctx,
                           int flags,
-                          byte[] masterKey) {
+                          byte[] masterKey,
+                          boolean allowFastFail) {
         readEntryInternal(ledgerId, entryId, null, null, false,
-                          cb, ctx, (short) flags, masterKey);
+                          cb, ctx, (short) flags, masterKey, allowFastFail);
     }
 
     private void readEntryInternal(final long ledgerId,
@@ -675,7 +699,8 @@ private void readEntryInternal(final long ledgerId,
                                    final ReadEntryCallback cb,
                                    final Object ctx,
                                    int flags,
-                                   byte[] masterKey) {
+                                   byte[] masterKey,
+                                   boolean allowFastFail) {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
@@ -749,7 +774,7 @@ private void readEntryInternal(final long ledgerId,
             }
         }
 
-        writeAndFlush(channel, completionKey, request);
+        writeAndFlush(channel, completionKey, request, allowFastFail);
     }
 
     public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) {
@@ -851,6 +876,7 @@ private void closeInternal(boolean permanent, boolean wait) {
             }
             toClose = channel;
             channel = null;
+            makeWritable();
         }
         if (toClose != null) {
             ChannelFuture cf = closeChannel(toClose);
@@ -864,18 +890,48 @@ private ChannelFuture closeChannel(Channel c) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Closing channel {}", c);
         }
+        return c.close().addListener(x -> makeWritable());
+    }
 
-        return c.close();
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+        final Channel c = channel;
+        if (c == null || c.isWritable()) {
+            makeWritable();
+        }
+        super.channelWritabilityChanged(ctx);
     }
 
     private void writeAndFlush(final Channel channel,
                                final CompletionKey key,
                                final Object request) {
+        writeAndFlush(channel, key, request, false);
+    }
+
+    private void writeAndFlush(final Channel channel,
+                           final CompletionKey key,
+                           final Object request,
+                           final boolean allowFastFail) {
         if (channel == null) {
+            LOG.warn("Operation {} failed: channel == null", requestToString(request));
             errorOut(key);
             return;
         }
 
+        final boolean isChannelWritable = channel.isWritable();
+        if (isWritable != isChannelWritable) {
+            // isWritable is volatile so simple "isWritable = channel.isWritable()" would be slower
+            isWritable = isChannelWritable;
+        }
+
+        if (allowFastFail && !isWritable) {
+            LOG.warn("Operation {} failed: TooManyRequestsException",
+                    requestToString(request));
+
+            errorOut(key, BKException.Code.TooManyRequestsException);
+            return;
+        }
+
         try {
             final long startTime = MathUtils.nowInNano();
 
@@ -1305,6 +1361,8 @@ public void operationComplete(Future<Channel> future) throws Exception {
                         pendingOps = new ArrayDeque<>();
                     }
 
+                    makeWritable();
+
                     for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
                         pendingOp.operationComplete(rc, PerChannelBookieClient.this);
                     }
@@ -2019,6 +2077,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
                     rc = BKException.Code.OK;
                     channel = future.channel();
                     if (shFactory != null) {
+                        makeWritable();
                         initiateTLS();
                         return;
                     } else {
@@ -2072,6 +2131,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
             for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
                 pendingOp.operationComplete(rc, PerChannelBookieClient.this);
             }
+
+            makeWritable();
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index 97a6a2694..aa7a5e944 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -40,6 +40,17 @@
      */
     void obtain(GenericCallback<PerChannelBookieClient> callback, long key);
 
+    /**
+     * Returns status of a client.
+     * It is suggested to delay/throttle requests to this channel if isWritable is false.
+     *
+     * @param key
+     * @return
+     */
+    default boolean isWritable(long key) {
+        return true;
+    }
+
     /**
      * record any read/write error on {@link PerChannelBookieClientPool}.
      */
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 5efd7bde7..919faa6cd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -19,6 +19,7 @@
 
 import static org.apache.bookkeeper.client.api.BKException.Code.NoBookieAvailableException;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -68,6 +69,7 @@
 import org.junit.Before;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.Stubber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -159,6 +161,7 @@ public void setup() throws Exception {
         when(bk.getLedgerManager()).thenReturn(ledgerManager);
         when(bk.getLedgerIdGenerator()).thenReturn(ledgerIdGenerator);
         when(bk.getReturnRc(anyInt())).thenAnswer(invocationOnMock -> invocationOnMock.getArgument(0));
+        when(bookieClient.isWritable(any(), anyLong())).thenReturn(true);
 
         setupLedgerIdGenerator();
         setupCreateLedgerMetadata();
@@ -186,6 +189,7 @@ protected NullStatsLogger setupLoggers() {
         when(bk.getCreateOpLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
         when(bk.getRecoverAddCountLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
         when(bk.getRecoverReadCountLogger()).thenReturn(nullStatsLogger.getOpStatsLogger("mock"));
+        when(bk.getAddOpUrCounter()).thenReturn(nullStatsLogger.getCounter("mock"));
         return nullStatsLogger;
     }
 
@@ -396,7 +400,7 @@ private void setupWriteLedgerMetadata() {
 
     @SuppressWarnings("unchecked")
     protected void setupBookieClientReadEntry() {
-        Answer<Void> answer = invokation -> {
+        final Stubber stub = doAnswer(invokation -> {
             Object[] args = invokation.getArguments();
             BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
             long ledgerId = (Long) args[1];
@@ -439,13 +443,19 @@ protected void setupBookieClientReadEntry() {
                 }
             });
             return null;
-        };
-        doAnswer(answer).when(bookieClient).readEntry(any(), anyLong(), anyLong(),
+        });
+
+        stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(),
                 any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
                 any(), anyInt());
-        doAnswer(answer).when(bookieClient).readEntry(any(), anyLong(), anyLong(),
+
+        stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(),
                 any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
                 any(), anyInt(), any());
+
+        stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(),
+                any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
+                any(), anyInt(), any(), anyBoolean());
     }
 
     private byte[] extractEntryPayload(long ledgerId, long entryId, ByteBufList toSend)
@@ -468,7 +478,7 @@ protected void setupBookieClientReadEntry() {
 
     @SuppressWarnings("unchecked")
     protected void setupBookieClientAddEntry() {
-        doAnswer(invokation -> {
+        final Stubber stub = doAnswer(invokation -> {
             Object[] args = invokation.getArguments();
             BookkeeperInternalCallbacks.WriteCallback callback = (BookkeeperInternalCallbacks.WriteCallback) args[5];
             BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0];
@@ -506,11 +516,19 @@ protected void setupBookieClientAddEntry() {
                 }
             });
             return null;
-        }).when(bookieClient).addEntry(any(BookieSocketAddress.class),
-            anyLong(), any(byte[].class),
-            anyLong(), any(ByteBufList.class),
-            any(BookkeeperInternalCallbacks.WriteCallback.class),
-            any(), anyInt());
+        });
+
+        stub.when(bookieClient).addEntry(any(BookieSocketAddress.class),
+                anyLong(), any(byte[].class),
+                anyLong(), any(ByteBufList.class),
+                any(BookkeeperInternalCallbacks.WriteCallback.class),
+                any(), anyInt());
+
+        stub.when(bookieClient).addEntry(any(BookieSocketAddress.class),
+                anyLong(), any(byte[].class),
+                anyLong(), any(ByteBufList.class),
+                any(BookkeeperInternalCallbacks.WriteCallback.class),
+                any(), anyInt(), anyBoolean());
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 25dfaf411..dcd4b3473 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -23,6 +23,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.List;
 import java.util.Set;
@@ -46,6 +47,8 @@
 public class SlowBookieTest extends BookKeeperClusterTestCase {
     private static final Logger LOG = LoggerFactory.getLogger(SlowBookieTest.class);
 
+    final byte[] entry = "Test Entry".getBytes();
+
     public SlowBookieTest() {
         super(4);
         baseConf.setNumAddWorkerThreads(0);
@@ -109,7 +112,6 @@ public void testBookieFailureWithSlowBookie() throws Exception {
         final LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, pwd);
         final AtomicBoolean finished = new AtomicBoolean(false);
         final AtomicBoolean failTest = new AtomicBoolean(false);
-        final byte[] entry = "Test Entry".getBytes();
         Thread t = new Thread() {
                 public void run() {
                     try {
@@ -124,10 +126,13 @@ public void run() {
             };
         t.start();
         final CountDownLatch b0latch = new CountDownLatch(1);
+
         startNewBookie();
         sleepBookie(getBookie(0), b0latch);
+
         Thread.sleep(10000);
         b0latch.countDown();
+
         finished.set(true);
         t.join();
 
@@ -152,6 +157,148 @@ public void operationComplete(int rc, Set<LedgerFragment> badFragments) {
         assertEquals("There should be no missing fragments", 0, numFragments.get());
     }
 
+    @Test
+    public void testSlowBookieAndBackpressureOn() throws Exception {
+        final ClientConfiguration conf = new ClientConfiguration();
+        conf.setReadTimeout(5)
+                .setAddEntryTimeout(1)
+                .setAddEntryQuorumTimeout(1)
+                .setNumChannelsPerBookie(1)
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setClientWriteBufferLowWaterMark(1)
+                .setClientWriteBufferHighWaterMark(entry.length - 1)
+                .setWaitTimeoutOnBackpressureMillis(5000);
+
+        final boolean expectWriteError = false;
+        final boolean expectFailedTest = false;
+
+        LedgerHandle lh = doBackpressureTest(entry, conf, expectWriteError, expectFailedTest, 2000);
+        assertTrue(lh.readLastConfirmed() < 5);
+    }
+
+    @Test
+    public void testSlowBookieAndFastFailOn() throws Exception {
+        final ClientConfiguration conf = new ClientConfiguration();
+        conf.setReadTimeout(5)
+                .setAddEntryTimeout(1)
+                .setAddEntryQuorumTimeout(1)
+                .setNumChannelsPerBookie(1)
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setClientWriteBufferLowWaterMark(1)
+                .setClientWriteBufferHighWaterMark(2)
+                .setWaitTimeoutOnBackpressureMillis(0);
+
+        final boolean expectWriteError = true;
+        final boolean expectFailedTest = false;
+
+        LedgerHandle lh = doBackpressureTest(entry, conf, expectWriteError, expectFailedTest, 1000);
+        assertTrue(lh.readLastConfirmed() < 5);
+    }
+
+    @Test
+    public void testSlowBookieAndNoBackpressure() throws Exception {
+        final ClientConfiguration conf = new ClientConfiguration();
+        conf.setReadTimeout(5)
+                .setAddEntryTimeout(1)
+                .setAddEntryQuorumTimeout(1)
+                .setNumChannelsPerBookie(1)
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setClientWriteBufferLowWaterMark(1)
+                .setClientWriteBufferHighWaterMark(entry.length - 1)
+                .setWaitTimeoutOnBackpressureMillis(-1);
+
+        final boolean expectWriteError = false;
+        final boolean expectFailedTest = false;
+
+        LedgerHandle lh = doBackpressureTest(entry, conf, expectWriteError, expectFailedTest, 4000);
+
+        assertTrue(lh.readLastConfirmed() > 90);
+    }
+
+    private LedgerHandle doBackpressureTest(byte[] entry, ClientConfiguration conf,
+                                    boolean expectWriteError, boolean expectFailedTest,
+                                    long sleepInMillis) throws Exception {
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] pwd = new byte[] {};
+        final LedgerHandle lh = bkc.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, pwd);
+        lh.addEntry(entry);
+
+        final AtomicBoolean finished = new AtomicBoolean(false);
+        final AtomicBoolean failTest = new AtomicBoolean(false);
+        final AtomicBoolean writeError = new AtomicBoolean(false);
+        Thread t = new Thread(() -> {
+            try {
+                int count = 0;
+                while (!finished.get()) {
+                    lh.asyncAddEntry(entry, (rc, lh1, entryId, ctx) -> {
+                        if (rc != BKException.Code.OK) {
+                            finished.set(true);
+                            writeError.set(true);
+                        }
+                    }, null);
+                    if (++count > 100) {
+                        finished.set(true);
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error("Exception in add entry thread", e);
+                failTest.set(true);
+            }
+        });
+        final CountDownLatch b0latch = new CountDownLatch(1);
+        final CountDownLatch b0latch2 = new CountDownLatch(1);
+
+
+        sleepBookie(getBookie(0), b0latch);
+        sleepBookie(getBookie(1), b0latch2);
+
+        setTargetChannelState(bkc, getBookie(0), 0, false);
+        setTargetChannelState(bkc, getBookie(1), 0, false);
+
+        t.start();
+
+        Thread.sleep(sleepInMillis);
+
+        finished.set(true);
+
+        b0latch.countDown();
+        b0latch2.countDown();
+        setTargetChannelState(bkc, getBookie(0), 0, true);
+        setTargetChannelState(bkc, getBookie(1), 0, true);
+
+        t.join();
+
+        assertEquals("write error", expectWriteError, writeError.get());
+        assertEquals("test failure", expectFailedTest, failTest.get());
+
+        lh.close();
+
+        LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
+        LedgerChecker lc = new LedgerChecker(bkc);
+        final CountDownLatch checklatch = new CountDownLatch(1);
+        final AtomicInteger numFragments = new AtomicInteger(-1);
+        lc.checkLedger(lh2, (rc, fragments) -> {
+            LOG.debug("Checked ledgers returned {} {}", rc, fragments);
+            if (rc == BKException.Code.OK) {
+                numFragments.set(fragments.size());
+                LOG.error("Checked ledgers returned {} {}", rc, fragments);
+            }
+            checklatch.countDown();
+        });
+        checklatch.await();
+        assertEquals("There should be no missing fragments", 0, numFragments.get());
+
+        return lh2;
+    }
+
+    private void setTargetChannelState(BookKeeper bkc, BookieSocketAddress address,
+                                       long key, boolean state) throws Exception {
+        bkc.getBookieClient().lookupClient(address).obtain((rc, pcbc) -> {
+            pcbc.setWritable(state);
+        }, key);
+    }
+
     @Test
     public void testManyBookieFailureWithSlowBookies() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
@@ -163,7 +310,6 @@ public void testManyBookieFailureWithSlowBookies() throws Exception {
         final LedgerHandle lh = bkc.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, pwd);
         final AtomicBoolean finished = new AtomicBoolean(false);
         final AtomicBoolean failTest = new AtomicBoolean(false);
-        final byte[] entry = "Test Entry".getBytes();
         Thread t = new Thread() {
                 public void run() {
                     try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index d96dce104..7dc62779e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -277,7 +277,8 @@ public void safeRun() {
                     return;
                 }
 
-                client.readEntry(1, 1, cb, null, BookieProtocol.FLAG_DO_FENCING, "00000111112222233333".getBytes());
+                client.readEntry(1, 1, cb, null, BookieProtocol.FLAG_DO_FENCING,
+                        "00000111112222233333".getBytes(), false);
             }
         });
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services