You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2021/05/03 18:45:04 UTC

[bookkeeper] branch master updated: fix: Bookkeeper client throttling logic is based upon entryId instead of ledgerId

This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f22d23  fix: Bookkeeper client throttling logic is based upon entryId instead of ledgerId
0f22d23 is described below

commit 0f22d238c225ec667c58e1c5029644478d636128
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Mon May 3 11:44:55 2021 -0700

    fix: Bookkeeper client throttling logic is based upon entryId instead of ledgerId
    
    Descriptions of the changes in this PR:
    
    Fixes: #2660
    
    ### Changes
    
    isWriteSetWritable() to use ledgerId for the client selection
    
    Master Issue: #2660
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Ivan Kelly <iv...@apache.org>
    
    This closes #2664 from dlg99/fix/2660
---
 .../java/org/apache/bookkeeper/client/LedgerHandle.java  | 16 ++++++++--------
 .../org/apache/bookkeeper/client/LedgerHandleAdv.java    |  2 +-
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 806dd8b..48818bd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -880,7 +880,7 @@ public class LedgerHandle implements WriteHandle {
             // unresponsive thus helpful enough.
             DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(firstEntry);
             try {
-                if (!waitForWritable(ws, firstEntry, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) {
+                if (!waitForWritable(ws, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) {
                     op.allowFailFastOnUnwritableChannel();
                 }
             } finally {
@@ -1213,7 +1213,7 @@ public class LedgerHandle implements WriteHandle {
     }
 
     private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet,
-                                       long key, int allowedNonWritableCount) {
+                                       int allowedNonWritableCount) {
         if (allowedNonWritableCount < 0) {
             allowedNonWritableCount = 0;
         }
@@ -1224,7 +1224,7 @@ public class LedgerHandle implements WriteHandle {
         int nonWritableCount = 0;
         List<BookieId> currentEnsemble = getCurrentEnsemble();
         for (int i = 0; i < sz; i++) {
-            if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), key)) {
+            if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), ledgerId)) {
                 nonWritableCount++;
                 if (nonWritableCount >= allowedNonWritableCount) {
                     return false;
@@ -1239,21 +1239,21 @@ public class LedgerHandle implements WriteHandle {
         return true;
     }
 
-    protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, long key,
+    protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet,
                                     int allowedNonWritableCount, long durationMs) {
         if (durationMs < 0) {
             return true;
         }
 
         final long startTime = MathUtils.nowInNano();
-        boolean success = isWriteSetWritable(writeSet, key, allowedNonWritableCount);
+        boolean success = isWriteSetWritable(writeSet, allowedNonWritableCount);
 
         if (!success && durationMs > 0) {
             int backoff = 1;
             final int maxBackoff = 4;
             final long deadline = startTime + TimeUnit.MILLISECONDS.toNanos(durationMs);
 
-            while (!isWriteSetWritable(writeSet, key, allowedNonWritableCount)) {
+            while (!isWriteSetWritable(writeSet, allowedNonWritableCount)) {
                 if (MathUtils.nowInNano() < deadline) {
                     long maxSleep = MathUtils.elapsedMSec(startTime);
                     if (maxSleep < 0) {
@@ -1265,7 +1265,7 @@ public class LedgerHandle implements WriteHandle {
                         TimeUnit.MILLISECONDS.sleep(sleepMs);
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
-                        success = isWriteSetWritable(writeSet, key, allowedNonWritableCount);
+                        success = isWriteSetWritable(writeSet, allowedNonWritableCount);
                         break;
                     }
                     if (backoff <= maxBackoff) {
@@ -1340,7 +1340,7 @@ public class LedgerHandle implements WriteHandle {
 
         DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
         try {
-            if (!waitForWritable(ws, op.getEntryId(), 0, clientCtx.getConf().waitForWriteSetMs)) {
+            if (!waitForWritable(ws, 0, clientCtx.getConf().waitForWriteSetMs)) {
                 op.allowFailFastOnUnwritableChannel();
             }
         } finally {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 2ea0e0a..1bdc653 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -264,7 +264,7 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
         }
 
         if (!waitForWritable(distributionSchedule.getWriteSet(op.getEntryId()),
-                    op.getEntryId(), 0, clientCtx.getConf().waitForWriteSetMs)) {
+                    0, clientCtx.getConf().waitForWriteSetMs)) {
             op.allowFailFastOnUnwritableChannel();
         }