You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by pr...@apache.org on 2022/01/11 18:31:29 UTC

[bookkeeper] branch master updated: checkAllLedgers in Auditor supports read throttle (#2973)

This is an automated email from the ASF dual-hosted git repository.

prashantkumar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 525a4a0  checkAllLedgers in Auditor supports read throttle (#2973)
525a4a0 is described below

commit 525a4a03f99afe1dca120477273537a6db46873c
Author: chenlin <15...@qq.com>
AuthorDate: Wed Jan 12 02:31:20 2022 +0800

    checkAllLedgers in Auditor supports read throttle (#2973)
    
    * support  read throttle in checkAllLedgers
    
    * using number of entries in flight (using Semaphore, releasing when processed) instead of guessing avg sizes and rate limiting.
    
    * using number of entries in flight (using Semaphore, releasing when processed) instead of guessing avg sizes and rate limiting.
    
    * check style
---
 .../apache/bookkeeper/client/LedgerChecker.java    | 60 +++++++++++++++++++---
 .../bookkeeper/conf/ServerConfiguration.java       | 12 +++++
 .../org/apache/bookkeeper/replication/Auditor.java |  2 +-
 3 files changed, 66 insertions(+), 8 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index f04ce24..87cbd5f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -51,6 +52,8 @@ public class LedgerChecker {
     public final BookieClient bookieClient;
     public final BookieWatcher bookieWatcher;
 
+    final Semaphore semaphore;
+
     static class InvalidFragmentException extends Exception {
         private static final long serialVersionUID = 1467201276417062353L;
     }
@@ -60,7 +63,7 @@ public class LedgerChecker {
      * call back to previous call back API which is waiting for it once it meets
      * the expected call backs from down.
      */
-    private static class ReadManyEntriesCallback implements ReadEntryCallback {
+    private class ReadManyEntriesCallback implements ReadEntryCallback {
         AtomicBoolean completed = new AtomicBoolean(false);
         final AtomicLong numEntries;
         final LedgerFragment fragment;
@@ -76,6 +79,7 @@ public class LedgerChecker {
         @Override
         public void readEntryComplete(int rc, long ledgerId, long entryId,
                 ByteBuf buffer, Object ctx) {
+            releasePermit();
             if (rc == BKException.Code.OK) {
                 if (numEntries.decrementAndGet() == 0
                         && !completed.getAndSet(true)) {
@@ -141,8 +145,40 @@ public class LedgerChecker {
     }
 
     public LedgerChecker(BookieClient client, BookieWatcher watcher) {
+        this(client, watcher, -1);
+    }
+
+    public LedgerChecker(BookKeeper bkc, int inFlightReadEntryNum) {
+        this(bkc.getBookieClient(), bkc.getBookieWatcher(), inFlightReadEntryNum);
+    }
+
+    public LedgerChecker(BookieClient client, BookieWatcher watcher, int inFlightReadEntryNum) {
         bookieClient = client;
         bookieWatcher = watcher;
+        if (inFlightReadEntryNum > 0) {
+            semaphore = new Semaphore(inFlightReadEntryNum);
+        } else {
+            semaphore = null;
+        }
+    }
+
+    /**
+     * Acquires a permit from permit manager,
+     * blocking until all are available.
+     */
+    public void acquirePermit() throws InterruptedException {
+        if (null != semaphore) {
+            semaphore.acquire(1);
+        }
+    }
+
+    /**
+     * Release a given permit.
+     */
+    public void releasePermit() {
+        if (null != semaphore) {
+            semaphore.release();
+        }
     }
 
     /**
@@ -157,7 +193,7 @@ public class LedgerChecker {
     private void verifyLedgerFragment(LedgerFragment fragment,
                                       GenericCallback<LedgerFragment> cb,
                                       Long percentageOfLedgerFragmentToBeVerified)
-            throws InvalidFragmentException, BKException {
+            throws InvalidFragmentException, BKException, InterruptedException {
         Set<Integer> bookiesToCheck = fragment.getBookiesIndexes();
         if (bookiesToCheck.isEmpty()) {
             cb.operationComplete(BKException.Code.OK, fragment);
@@ -188,7 +224,7 @@ public class LedgerChecker {
                                       int bookieIndex,
                                       GenericCallback<LedgerFragment> cb,
                                       long percentageOfLedgerFragmentToBeVerified)
-            throws InvalidFragmentException {
+            throws InvalidFragmentException, InterruptedException {
         long firstStored = fragment.getFirstStoredEntryId(bookieIndex);
         long lastStored = fragment.getLastStoredEntryId(bookieIndex);
 
@@ -207,6 +243,7 @@ public class LedgerChecker {
             // fragment is on this bookie, but already know it's unavailable, so skip the call
             cb.operationComplete(BKException.Code.BookieHandleNotAvailableException, fragment);
         } else if (firstStored == lastStored) {
+            acquirePermit();
             ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
                     fragment, cb);
             bookieClient.readEntry(bookie, fragment.getLedgerId(), firstStored,
@@ -251,6 +288,7 @@ public class LedgerChecker {
             ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(entriesToBeVerified.size(),
                     fragment, cb);
             for (Long entryID: entriesToBeVerified) {
+                acquirePermit();
                 bookieClient.readEntry(bookie, fragment.getLedgerId(), entryID, manycb, null, BookieProtocol.FLAG_NONE);
             }
         }
@@ -261,7 +299,7 @@ public class LedgerChecker {
      * It is used to differentiate the cases where it has been written
      * but now cannot be read, and where it never has been written.
      */
-    private static class EntryExistsCallback implements ReadEntryCallback {
+    private class EntryExistsCallback implements ReadEntryCallback {
         AtomicBoolean entryMayExist = new AtomicBoolean(false);
         final AtomicInteger numReads;
         final GenericCallback<Boolean> cb;
@@ -275,6 +313,7 @@ public class LedgerChecker {
         @Override
         public void readEntryComplete(int rc, long ledgerId, long entryId,
                                       ByteBuf buffer, Object ctx) {
+            releasePermit();
             if (BKException.Code.NoSuchEntryException != rc && BKException.Code.NoSuchLedgerExistsException != rc
                     && BKException.Code.NoSuchLedgerExistsOnMetadataServerException != rc) {
                 entryMayExist.set(true);
@@ -395,9 +434,14 @@ public class LedgerChecker {
 
                 DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule().getWriteSet(entryToRead);
                 for (int i = 0; i < writeSet.size(); i++) {
-                    BookieId addr = curEnsemble.get(writeSet.get(i));
-                    bookieClient.readEntry(addr, lh.getId(), entryToRead,
-                                           eecb, null, BookieProtocol.FLAG_NONE);
+                    try {
+                        acquirePermit();
+                        BookieId addr = curEnsemble.get(writeSet.get(i));
+                        bookieClient.readEntry(addr, lh.getId(), entryToRead,
+                                eecb, null, BookieProtocol.FLAG_NONE);
+                    } catch (InterruptedException e) {
+                        LOG.error("InterruptedException when checking entry : {}", entryToRead, e);
+                    }
                 }
                 writeSet.recycle();
                 return;
@@ -429,6 +473,8 @@ public class LedgerChecker {
                         BKException.Code.IncorrectParameterException, r);
             } catch (BKException e) {
                 LOG.error("BKException when checking fragment : {}", r, e);
+            } catch (InterruptedException e) {
+                LOG.error("InterruptedException when checking fragment : {}", r, e);
             }
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 7af77d5..0f6dcc3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -214,6 +214,8 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     protected static final String AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC =
         "auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec";
     protected static final String REPLICATION_RATE_BY_BYTES = "replicationRateByBytes";
+    protected static final String IN_FLIGHT_READ_ENTRY_NUM_IN_LEDGER_CHECKER = "inFlightReadEntryNumInLedgerChecker";
+
 
     // Worker Thread parameters.
     protected static final String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
@@ -3724,6 +3726,16 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     }
 
     /**
+     * Get in flight read entry number when ledger checker.
+     * Default value is -1 which it is unlimited  when ledger checker.
+     *
+     * @return read entry number of in flight.
+     */
+    public int getInFlightReadEntryNumInLedgerChecker(){
+        return getInt(IN_FLIGHT_READ_ENTRY_NUM_IN_LEDGER_CHECKER, -1);
+    }
+
+    /**
      * Set the rate of re-replication.
      *
      * @param rate bytes rate of re-replication.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 1b74a8e..8c69812 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -1248,7 +1248,7 @@ public class Auditor implements AutoCloseable {
         final BookKeeper localClient = getBookKeeper(conf);
         final BookKeeperAdmin localAdmin = getBookKeeperAdmin(localClient);
         try {
-            final LedgerChecker checker = new LedgerChecker(localClient);
+            final LedgerChecker checker = new LedgerChecker(localClient, conf.getInFlightReadEntryNumInLedgerChecker());
 
             final CompletableFuture<Void> processFuture = new CompletableFuture<>();