You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2021/05/03 18:45:04 UTC
[bookkeeper] branch master updated: fix: Bookkeeper client
throttling logic is based upon entryId instead of ledgerId
This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 0f22d23 fix: Bookkeeper client throttling logic is based upon entryId instead of ledgerId
0f22d23 is described below
commit 0f22d238c225ec667c58e1c5029644478d636128
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Mon May 3 11:44:55 2021 -0700
fix: Bookkeeper client throttling logic is based upon entryId instead of ledgerId
Descriptions of the changes in this PR:
Fixes: #2660
### Changes
isWriteSetWritable() to use ledgerId for the client selection
Master Issue: #2660
Reviewers: Enrico Olivelli <eo...@gmail.com>, Ivan Kelly <iv...@apache.org>
This closes #2664 from dlg99/fix/2660
---
.../java/org/apache/bookkeeper/client/LedgerHandle.java | 16 ++++++++--------
.../org/apache/bookkeeper/client/LedgerHandleAdv.java | 2 +-
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 806dd8b..48818bd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -880,7 +880,7 @@ public class LedgerHandle implements WriteHandle {
// unresponsive thus helpful enough.
DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(firstEntry);
try {
- if (!waitForWritable(ws, firstEntry, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) {
+ if (!waitForWritable(ws, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}
} finally {
@@ -1213,7 +1213,7 @@ public class LedgerHandle implements WriteHandle {
}
private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet,
- long key, int allowedNonWritableCount) {
+ int allowedNonWritableCount) {
if (allowedNonWritableCount < 0) {
allowedNonWritableCount = 0;
}
@@ -1224,7 +1224,7 @@ public class LedgerHandle implements WriteHandle {
int nonWritableCount = 0;
List<BookieId> currentEnsemble = getCurrentEnsemble();
for (int i = 0; i < sz; i++) {
- if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), key)) {
+ if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), ledgerId)) {
nonWritableCount++;
if (nonWritableCount >= allowedNonWritableCount) {
return false;
@@ -1239,21 +1239,21 @@ public class LedgerHandle implements WriteHandle {
return true;
}
- protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, long key,
+ protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet,
int allowedNonWritableCount, long durationMs) {
if (durationMs < 0) {
return true;
}
final long startTime = MathUtils.nowInNano();
- boolean success = isWriteSetWritable(writeSet, key, allowedNonWritableCount);
+ boolean success = isWriteSetWritable(writeSet, allowedNonWritableCount);
if (!success && durationMs > 0) {
int backoff = 1;
final int maxBackoff = 4;
final long deadline = startTime + TimeUnit.MILLISECONDS.toNanos(durationMs);
- while (!isWriteSetWritable(writeSet, key, allowedNonWritableCount)) {
+ while (!isWriteSetWritable(writeSet, allowedNonWritableCount)) {
if (MathUtils.nowInNano() < deadline) {
long maxSleep = MathUtils.elapsedMSec(startTime);
if (maxSleep < 0) {
@@ -1265,7 +1265,7 @@ public class LedgerHandle implements WriteHandle {
TimeUnit.MILLISECONDS.sleep(sleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- success = isWriteSetWritable(writeSet, key, allowedNonWritableCount);
+ success = isWriteSetWritable(writeSet, allowedNonWritableCount);
break;
}
if (backoff <= maxBackoff) {
@@ -1340,7 +1340,7 @@ public class LedgerHandle implements WriteHandle {
DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
try {
- if (!waitForWritable(ws, op.getEntryId(), 0, clientCtx.getConf().waitForWriteSetMs)) {
+ if (!waitForWritable(ws, 0, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}
} finally {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 2ea0e0a..1bdc653 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -264,7 +264,7 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
}
if (!waitForWritable(distributionSchedule.getWriteSet(op.getEntryId()),
- op.getEntryId(), 0, clientCtx.getConf().waitForWriteSetMs)) {
+ 0, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}