You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2023/03/02 15:52:18 UTC
[bookkeeper] branch master updated: Avoid using thread-local WriteSet when possible (#3829)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 20aad8006e Avoid using thread-local WriteSet when possible (#3829)
20aad8006e is described below
commit 20aad8006e7bb21f034dfe61bc68a38968428d1e
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 2 07:52:10 2023 -0800
Avoid using thread-local WriteSet when possible (#3829)
---
.../bookkeeper/client/DistributionSchedule.java | 12 ++++++++++++
.../org/apache/bookkeeper/client/LedgerChecker.java | 7 +++----
.../org/apache/bookkeeper/client/PendingAddOp.java | 21 +++++----------------
.../apache/bookkeeper/client/PendingWriteLacOp.java | 10 +++-------
.../client/RoundRobinDistributionSchedule.java | 18 +++++++++++++-----
5 files changed, 36 insertions(+), 32 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index ea9017467f..2646d3abc5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -136,11 +136,23 @@ public interface DistributionSchedule {
}
};
+ int getWriteQuorumSize();
+
/**
* Return the set of bookie indices to send the message to.
*/
WriteSet getWriteSet(long entryId);
+ /**
+ * Return the WriteSet bookie index for a given and index
+ * in the WriteSet.
+ *
+ * @param entryId
+ * @param writeSetIndex
+ * @return
+ */
+ int getWriteSetBookieIndex(long entryId, int writeSetIndex);
+
/**
* Return the set of bookies indices to send the messages to the whole ensemble.
*
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index a5cf3eec9a..6bbdc098de 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -443,18 +443,17 @@ public class LedgerChecker {
}
});
- DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule().getWriteSet(entryToRead);
- for (int i = 0; i < writeSet.size(); i++) {
+ DistributionSchedule ds = lh.getDistributionSchedule();
+ for (int i = 0; i < ds.getWriteQuorumSize(); i++) {
try {
acquirePermit();
- BookieId addr = curEnsemble.get(writeSet.get(i));
+ BookieId addr = curEnsemble.get(ds.getWriteSetBookieIndex(entryToRead, i));
bookieClient.readEntry(addr, lh.getId(), entryToRead,
eecb, null, BookieProtocol.FLAG_NONE);
} catch (InterruptedException e) {
LOG.error("InterruptedException when checking entry : {}", entryToRead, e);
}
}
- writeSet.recycle();
return;
} else {
fragments.add(lastLedgerFragment);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 3a78696e88..05f740d33a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -198,14 +198,9 @@ class PendingAddOp implements WriteCallback {
// completes.
//
// We call sendAddSuccessCallback when unsetting t cover this case.
- DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);
- try {
- if (!writeSet.contains(bookieIndex)) {
- lh.sendAddSuccessCallbacks();
- return;
- }
- } finally {
- writeSet.recycle();
+ if (!lh.distributionSchedule.hasEntry(entryId, bookieIndex)) {
+ lh.sendAddSuccessCallbacks();
+ return;
}
if (callbackTriggered) {
@@ -256,14 +251,8 @@ class PendingAddOp implements WriteCallback {
lh.maybeHandleDelayedWriteBookieFailure();
// Iterate over set and trigger the sendWriteRequests
- DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);
-
- try {
- for (int i = 0; i < writeSet.size(); i++) {
- sendWriteRequest(ensemble, writeSet.get(i));
- }
- } finally {
- writeSet.recycle();
+ for (int i = 0; i < lh.distributionSchedule.getWriteQuorumSize(); i++) {
+ sendWriteRequest(ensemble, lh.distributionSchedule.getWriteSetBookieIndex(entryId, i));
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index f34c9f04dc..f9a5397daf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -79,13 +79,9 @@ class PendingWriteLacOp implements WriteLacCallback {
void initiate(ByteBufList toSend) {
this.toSend = toSend;
- DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(lac);
- try {
- for (int i = 0; i < writeSet.size(); i++) {
- sendWriteLacRequest(writeSet.get(i));
- }
- } finally {
- writeSet.recycle();
+
+ for (int i = 0; i < lh.distributionSchedule.getWriteQuorumSize(); i++) {
+ sendWriteLacRequest(lh.distributionSchedule.getWriteSetBookieIndex(lac, i));
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index 81a3550071..6269012974 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -25,6 +25,7 @@ import io.netty.util.Recycler.Handle;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Map;
+import lombok.Getter;
import org.apache.bookkeeper.net.BookieId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
*/
public class RoundRobinDistributionSchedule implements DistributionSchedule {
private static final Logger LOG = LoggerFactory.getLogger(RoundRobinDistributionSchedule.class);
+ @Getter
private final int writeQuorumSize;
private final int ackQuorumSize;
private final int ensembleSize;
@@ -53,6 +55,11 @@ public class RoundRobinDistributionSchedule implements DistributionSchedule {
return WriteSetImpl.create(ensembleSize, writeQuorumSize, entryId);
}
+ @Override
+ public int getWriteSetBookieIndex(long entryId, int writeSetIndex) {
+ return (int) (entryId + writeSetIndex) % ensembleSize;
+ }
+
@Override
public WriteSet getEnsembleSet(long entryId) {
// for long poll reads and force ledger , we are trying all the bookies in the ensemble
@@ -418,12 +425,13 @@ public class RoundRobinDistributionSchedule implements DistributionSchedule {
@Override
public boolean hasEntry(long entryId, int bookieIndex) {
- WriteSet w = getWriteSet(entryId);
- try {
- return w.contains(bookieIndex);
- } finally {
- w.recycle();
+ for (int w = 0; w < writeQuorumSize; w++) {
+ if (bookieIndex == getWriteSetBookieIndex(entryId, w)) {
+ return true;
+ }
}
+
+ return false;
}
@Override