You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2022/03/10 00:03:23 UTC
[bookkeeper] branch master updated: check all bookies of writeset are writable
This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new e8cbd20 check all bookies of writeset are writable
e8cbd20 is described below
commit e8cbd20626b8702bbed186a4be37ce45cf6e9b41
Author: wuYin <wu...@gmail.com>
AuthorDate: Thu Mar 10 08:03:16 2022 +0800
check all bookies of writeset are writable
### Motivation
#1088 introduced ensemble writable checking before sending requests, but we should check bookies of writeset, instead of the first few bookies in current ensemble.
### Changes
Get the bookies of writeset from ensemble and check writeable.
Related change: https://github.com/apache/bookkeeper/pull/1088/files#diff-1d893bb31553b5e1f55c8301d04ae15f38e0d35f531f9dd22475128b7972ddf9R1108
Reviewers: Andrey Yegorov <None>
This closes #3055 from wuYin/writeset-writable
---
.../org/apache/bookkeeper/client/LedgerHandle.java | 5 +++-
.../apache/bookkeeper/client/SlowBookieTest.java | 29 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 1 deletion(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 661499e..9c4760b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1224,7 +1224,9 @@ public class LedgerHandle implements WriteHandle {
int nonWritableCount = 0;
List<BookieId> currentEnsemble = getCurrentEnsemble();
for (int i = 0; i < sz; i++) {
- if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), ledgerId)) {
+ int writeBookieIndex = writeSet.get(i);
+ if (writeBookieIndex < currentEnsemble.size()
+ && !clientCtx.getBookieClient().isWritable(currentEnsemble.get(writeBookieIndex), ledgerId)) {
nonWritableCount++;
if (nonWritableCount >= allowedNonWritableCount) {
return false;
@@ -1239,6 +1241,7 @@ public class LedgerHandle implements WriteHandle {
return true;
}
+ @VisibleForTesting
protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet,
int allowedNonWritableCount, long durationMs) {
if (durationMs < 0) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index ef17059..02ecf45 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -303,6 +304,34 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
}
@Test
+ public void testWritesetWriteableCheck() throws Exception {
+ final ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ BookKeeper bkc = new BookKeeper(conf);
+
+ byte[] pwd = new byte[]{};
+ final LedgerHandle lh = bkc.createLedger(4, 2, 2, BookKeeper.DigestType.CRC32, pwd);
+ try {
+ lh.addEntry(entry); // [b0, b1]
+ long entryId = lh.addEntry(entry); // [b1, b2]
+
+ long nextEntryId = entryId + 1;
+ RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(2, 2, 4);
+ DistributionSchedule.WriteSet writeSet = schedule.getWriteSet(nextEntryId);
+
+ // b2 or b3 is no more writeable
+ int slowBookieIndex = writeSet.get(ThreadLocalRandom.current().nextInt(writeSet.size()));
+ List<BookieId> curEns = lh.getCurrentEnsemble();
+ setTargetChannelState(bkc, curEns.get(slowBookieIndex), 0, false);
+
+ boolean isWriteable = lh.waitForWritable(writeSet, 0, 1000);
+ assertFalse("We should check b2,b3 both are writeable", isWriteable);
+ } finally {
+ lh.close();
+ }
+ }
+
+ @Test
public void testManyBookieFailureWithSlowBookies() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setReadTimeout(5)