You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by pr...@apache.org on 2022/01/11 18:31:29 UTC
[bookkeeper] branch master updated: checkAllLedgers in Auditor supports read throttle (#2973)
This is an automated email from the ASF dual-hosted git repository.
prashantkumar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 525a4a0 checkAllLedgers in Auditor supports read throttle (#2973)
525a4a0 is described below
commit 525a4a03f99afe1dca120477273537a6db46873c
Author: chenlin <15...@qq.com>
AuthorDate: Wed Jan 12 02:31:20 2022 +0800
checkAllLedgers in Auditor supports read throttle (#2973)
* support read throttle in checkAllLedgers
* using number of entries in flight (using Semaphore, releasing when processed) instead of guessing avg sizes and rate limiting.
* using number of entries in flight (using Semaphore, releasing when processed) instead of guessing avg sizes and rate limiting.
* check style
---
.../apache/bookkeeper/client/LedgerChecker.java | 60 +++++++++++++++++++---
.../bookkeeper/conf/ServerConfiguration.java | 12 +++++
.../org/apache/bookkeeper/replication/Auditor.java | 2 +-
3 files changed, 66 insertions(+), 8 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index f04ce24..87cbd5f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,6 +52,8 @@ public class LedgerChecker {
public final BookieClient bookieClient;
public final BookieWatcher bookieWatcher;
+ final Semaphore semaphore;
+
static class InvalidFragmentException extends Exception {
private static final long serialVersionUID = 1467201276417062353L;
}
@@ -60,7 +63,7 @@ public class LedgerChecker {
* call back to previous call back API which is waiting for it once it meets
* the expected call backs from down.
*/
- private static class ReadManyEntriesCallback implements ReadEntryCallback {
+ private class ReadManyEntriesCallback implements ReadEntryCallback {
AtomicBoolean completed = new AtomicBoolean(false);
final AtomicLong numEntries;
final LedgerFragment fragment;
@@ -76,6 +79,7 @@ public class LedgerChecker {
@Override
public void readEntryComplete(int rc, long ledgerId, long entryId,
ByteBuf buffer, Object ctx) {
+ releasePermit();
if (rc == BKException.Code.OK) {
if (numEntries.decrementAndGet() == 0
&& !completed.getAndSet(true)) {
@@ -141,8 +145,40 @@ public class LedgerChecker {
}
public LedgerChecker(BookieClient client, BookieWatcher watcher) {
+ this(client, watcher, -1);
+ }
+
+ public LedgerChecker(BookKeeper bkc, int inFlightReadEntryNum) {
+ this(bkc.getBookieClient(), bkc.getBookieWatcher(), inFlightReadEntryNum);
+ }
+
+ public LedgerChecker(BookieClient client, BookieWatcher watcher, int inFlightReadEntryNum) {
bookieClient = client;
bookieWatcher = watcher;
+ if (inFlightReadEntryNum > 0) {
+ semaphore = new Semaphore(inFlightReadEntryNum);
+ } else {
+ semaphore = null;
+ }
+ }
+
+ /**
+ * Acquires a permit from permit manager,
+ * blocking until all are available.
+ */
+ public void acquirePermit() throws InterruptedException {
+ if (null != semaphore) {
+ semaphore.acquire(1);
+ }
+ }
+
+ /**
+ * Release a given permit.
+ */
+ public void releasePermit() {
+ if (null != semaphore) {
+ semaphore.release();
+ }
}
/**
@@ -157,7 +193,7 @@ public class LedgerChecker {
private void verifyLedgerFragment(LedgerFragment fragment,
GenericCallback<LedgerFragment> cb,
Long percentageOfLedgerFragmentToBeVerified)
- throws InvalidFragmentException, BKException {
+ throws InvalidFragmentException, BKException, InterruptedException {
Set<Integer> bookiesToCheck = fragment.getBookiesIndexes();
if (bookiesToCheck.isEmpty()) {
cb.operationComplete(BKException.Code.OK, fragment);
@@ -188,7 +224,7 @@ public class LedgerChecker {
int bookieIndex,
GenericCallback<LedgerFragment> cb,
long percentageOfLedgerFragmentToBeVerified)
- throws InvalidFragmentException {
+ throws InvalidFragmentException, InterruptedException {
long firstStored = fragment.getFirstStoredEntryId(bookieIndex);
long lastStored = fragment.getLastStoredEntryId(bookieIndex);
@@ -207,6 +243,7 @@ public class LedgerChecker {
// fragment is on this bookie, but already know it's unavailable, so skip the call
cb.operationComplete(BKException.Code.BookieHandleNotAvailableException, fragment);
} else if (firstStored == lastStored) {
+ acquirePermit();
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
fragment, cb);
bookieClient.readEntry(bookie, fragment.getLedgerId(), firstStored,
@@ -251,6 +288,7 @@ public class LedgerChecker {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(entriesToBeVerified.size(),
fragment, cb);
for (Long entryID: entriesToBeVerified) {
+ acquirePermit();
bookieClient.readEntry(bookie, fragment.getLedgerId(), entryID, manycb, null, BookieProtocol.FLAG_NONE);
}
}
@@ -261,7 +299,7 @@ public class LedgerChecker {
* It is used to differentiate the cases where it has been written
* but now cannot be read, and where it never has been written.
*/
- private static class EntryExistsCallback implements ReadEntryCallback {
+ private class EntryExistsCallback implements ReadEntryCallback {
AtomicBoolean entryMayExist = new AtomicBoolean(false);
final AtomicInteger numReads;
final GenericCallback<Boolean> cb;
@@ -275,6 +313,7 @@ public class LedgerChecker {
@Override
public void readEntryComplete(int rc, long ledgerId, long entryId,
ByteBuf buffer, Object ctx) {
+ releasePermit();
if (BKException.Code.NoSuchEntryException != rc && BKException.Code.NoSuchLedgerExistsException != rc
&& BKException.Code.NoSuchLedgerExistsOnMetadataServerException != rc) {
entryMayExist.set(true);
@@ -395,9 +434,14 @@ public class LedgerChecker {
DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule().getWriteSet(entryToRead);
for (int i = 0; i < writeSet.size(); i++) {
- BookieId addr = curEnsemble.get(writeSet.get(i));
- bookieClient.readEntry(addr, lh.getId(), entryToRead,
- eecb, null, BookieProtocol.FLAG_NONE);
+ try {
+ acquirePermit();
+ BookieId addr = curEnsemble.get(writeSet.get(i));
+ bookieClient.readEntry(addr, lh.getId(), entryToRead,
+ eecb, null, BookieProtocol.FLAG_NONE);
+ } catch (InterruptedException e) {
+ LOG.error("InterruptedException when checking entry : {}", entryToRead, e);
+ }
}
writeSet.recycle();
return;
@@ -429,6 +473,8 @@ public class LedgerChecker {
BKException.Code.IncorrectParameterException, r);
} catch (BKException e) {
LOG.error("BKException when checking fragment : {}", r, e);
+ } catch (InterruptedException e) {
+ LOG.error("InterruptedException when checking fragment : {}", r, e);
}
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 7af77d5..0f6dcc3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -214,6 +214,8 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String AUDITOR_ACQUIRE_CONCURRENT_OPEN_LEDGER_OPERATIONS_TIMEOUT_MSEC =
"auditorAcquireConcurrentOpenLedgerOperationsTimeOutMSec";
protected static final String REPLICATION_RATE_BY_BYTES = "replicationRateByBytes";
+ protected static final String IN_FLIGHT_READ_ENTRY_NUM_IN_LEDGER_CHECKER = "inFlightReadEntryNumInLedgerChecker";
+
// Worker Thread parameters.
protected static final String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
@@ -3724,6 +3726,16 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
}
/**
+ * Get in flight read entry number when ledger checker.
+ * Default value is -1 which it is unlimited when ledger checker.
+ *
+ * @return read entry number of in flight.
+ */
+ public int getInFlightReadEntryNumInLedgerChecker(){
+ return getInt(IN_FLIGHT_READ_ENTRY_NUM_IN_LEDGER_CHECKER, -1);
+ }
+
+ /**
* Set the rate of re-replication.
*
* @param rate bytes rate of re-replication.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 1b74a8e..8c69812 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -1248,7 +1248,7 @@ public class Auditor implements AutoCloseable {
final BookKeeper localClient = getBookKeeper(conf);
final BookKeeperAdmin localAdmin = getBookKeeperAdmin(localClient);
try {
- final LedgerChecker checker = new LedgerChecker(localClient);
+ final LedgerChecker checker = new LedgerChecker(localClient, conf.getInFlightReadEntryNumInLedgerChecker());
final CompletableFuture<Void> processFuture = new CompletableFuture<>();