You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2022/03/10 00:03:23 UTC

[bookkeeper] branch master updated: check all bookies of writeset are writable

This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e8cbd20  check all bookies of writeset are writable
e8cbd20 is described below

commit e8cbd20626b8702bbed186a4be37ce45cf6e9b41
Author: wuYin <wu...@gmail.com>
AuthorDate: Thu Mar 10 08:03:16 2022 +0800

    check all bookies of writeset are writable
    
    ### Motivation
    
    #1088 introduced ensemble writable checking before sending requests, but we should check bookies of writeset, instead of the first few bookies in current ensemble.
    
    ### Changes
    
    Get the bookies of writeset from ensemble and check writeable.
    
    Related change: https://github.com/apache/bookkeeper/pull/1088/files#diff-1d893bb31553b5e1f55c8301d04ae15f38e0d35f531f9dd22475128b7972ddf9R1108
    
    
    Reviewers: Andrey Yegorov <None>
    
    This closes #3055 from wuYin/writeset-writable
---
 .../org/apache/bookkeeper/client/LedgerHandle.java |  5 +++-
 .../apache/bookkeeper/client/SlowBookieTest.java   | 29 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 661499e..9c4760b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1224,7 +1224,9 @@ public class LedgerHandle implements WriteHandle {
         int nonWritableCount = 0;
         List<BookieId> currentEnsemble = getCurrentEnsemble();
         for (int i = 0; i < sz; i++) {
-            if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), ledgerId)) {
+            int writeBookieIndex = writeSet.get(i);
+            if (writeBookieIndex < currentEnsemble.size()
+                && !clientCtx.getBookieClient().isWritable(currentEnsemble.get(writeBookieIndex), ledgerId)) {
                 nonWritableCount++;
                 if (nonWritableCount >= allowedNonWritableCount) {
                     return false;
@@ -1239,6 +1241,7 @@ public class LedgerHandle implements WriteHandle {
         return true;
     }
 
+    @VisibleForTesting
     protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet,
                                     int allowedNonWritableCount, long durationMs) {
         if (durationMs < 0) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index ef17059..02ecf45 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -303,6 +304,34 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
     }
 
     @Test
+    public void testWritesetWriteableCheck() throws Exception {
+        final ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] pwd = new byte[]{};
+        final LedgerHandle lh = bkc.createLedger(4, 2, 2, BookKeeper.DigestType.CRC32, pwd);
+        try {
+            lh.addEntry(entry); // [b0, b1]
+            long entryId = lh.addEntry(entry); // [b1, b2]
+
+            long nextEntryId = entryId + 1;
+            RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(2, 2, 4);
+            DistributionSchedule.WriteSet writeSet = schedule.getWriteSet(nextEntryId);
+
+            // b2 or b3 is no more writeable
+            int slowBookieIndex = writeSet.get(ThreadLocalRandom.current().nextInt(writeSet.size()));
+            List<BookieId> curEns = lh.getCurrentEnsemble();
+            setTargetChannelState(bkc, curEns.get(slowBookieIndex), 0, false);
+
+            boolean isWriteable = lh.waitForWritable(writeSet, 0, 1000);
+            assertFalse("We should check b2,b3 both are writeable", isWriteable);
+        } finally {
+            lh.close();
+        }
+    }
+
+    @Test
     public void testManyBookieFailureWithSlowBookies() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
         conf.setReadTimeout(5)