You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2023/03/02 15:52:18 UTC

[bookkeeper] branch master updated: Avoid using thread-local WriteSet when possible (#3829)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 20aad8006e Avoid using thread-local WriteSet when possible (#3829)
20aad8006e is described below

commit 20aad8006e7bb21f034dfe61bc68a38968428d1e
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 2 07:52:10 2023 -0800

    Avoid using thread-local WriteSet when possible (#3829)
---
 .../bookkeeper/client/DistributionSchedule.java     | 12 ++++++++++++
 .../org/apache/bookkeeper/client/LedgerChecker.java |  7 +++----
 .../org/apache/bookkeeper/client/PendingAddOp.java  | 21 +++++----------------
 .../apache/bookkeeper/client/PendingWriteLacOp.java | 10 +++-------
 .../client/RoundRobinDistributionSchedule.java      | 18 +++++++++++++-----
 5 files changed, 36 insertions(+), 32 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index ea9017467f..2646d3abc5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -136,11 +136,23 @@ public interface DistributionSchedule {
             }
         };
 
+    int getWriteQuorumSize();
+
     /**
      * Return the set of bookie indices to send the message to.
      */
     WriteSet getWriteSet(long entryId);
 
+    /**
+     * Return the WriteSet bookie index for a given and index
+     * in the WriteSet.
+     *
+     * @param entryId
+     * @param writeSetIndex
+     * @return
+     */
+    int getWriteSetBookieIndex(long entryId, int writeSetIndex);
+
     /**
      * Return the set of bookies indices to send the messages to the whole ensemble.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index a5cf3eec9a..6bbdc098de 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -443,18 +443,17 @@ public class LedgerChecker {
                                                   }
                                               });
 
-                DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule().getWriteSet(entryToRead);
-                for (int i = 0; i < writeSet.size(); i++) {
+                DistributionSchedule ds = lh.getDistributionSchedule();
+                for (int i = 0; i < ds.getWriteQuorumSize(); i++) {
                     try {
                         acquirePermit();
-                        BookieId addr = curEnsemble.get(writeSet.get(i));
+                        BookieId addr = curEnsemble.get(ds.getWriteSetBookieIndex(entryToRead, i));
                         bookieClient.readEntry(addr, lh.getId(), entryToRead,
                                 eecb, null, BookieProtocol.FLAG_NONE);
                     } catch (InterruptedException e) {
                         LOG.error("InterruptedException when checking entry : {}", entryToRead, e);
                     }
                 }
-                writeSet.recycle();
                 return;
             } else {
                 fragments.add(lastLedgerFragment);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 3a78696e88..05f740d33a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -198,14 +198,9 @@ class PendingAddOp implements WriteCallback {
         // completes.
         //
         // We call sendAddSuccessCallback when unsetting t cover this case.
-        DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);
-        try {
-            if (!writeSet.contains(bookieIndex)) {
-                lh.sendAddSuccessCallbacks();
-                return;
-            }
-        } finally {
-            writeSet.recycle();
+        if (!lh.distributionSchedule.hasEntry(entryId, bookieIndex)) {
+            lh.sendAddSuccessCallbacks();
+            return;
         }
 
         if (callbackTriggered) {
@@ -256,14 +251,8 @@ class PendingAddOp implements WriteCallback {
         lh.maybeHandleDelayedWriteBookieFailure();
 
         // Iterate over set and trigger the sendWriteRequests
-        DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);
-
-        try {
-            for (int i = 0; i < writeSet.size(); i++) {
-                sendWriteRequest(ensemble, writeSet.get(i));
-            }
-        } finally {
-            writeSet.recycle();
+        for (int i = 0; i < lh.distributionSchedule.getWriteQuorumSize(); i++) {
+            sendWriteRequest(ensemble, lh.distributionSchedule.getWriteSetBookieIndex(entryId, i));
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index f34c9f04dc..f9a5397daf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -79,13 +79,9 @@ class PendingWriteLacOp implements WriteLacCallback {
 
     void initiate(ByteBufList toSend) {
         this.toSend = toSend;
-        DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(lac);
-        try {
-            for (int i = 0; i < writeSet.size(); i++) {
-                sendWriteLacRequest(writeSet.get(i));
-            }
-        } finally {
-            writeSet.recycle();
+
+        for (int i = 0; i < lh.distributionSchedule.getWriteQuorumSize(); i++) {
+            sendWriteLacRequest(lh.distributionSchedule.getWriteSetBookieIndex(lac, i));
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index 81a3550071..6269012974 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -25,6 +25,7 @@ import io.netty.util.Recycler.Handle;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Map;
+import lombok.Getter;
 import org.apache.bookkeeper.net.BookieId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
  */
 public class RoundRobinDistributionSchedule implements DistributionSchedule {
     private static final Logger LOG = LoggerFactory.getLogger(RoundRobinDistributionSchedule.class);
+    @Getter
     private final int writeQuorumSize;
     private final int ackQuorumSize;
     private final int ensembleSize;
@@ -53,6 +55,11 @@ public class RoundRobinDistributionSchedule implements DistributionSchedule {
         return WriteSetImpl.create(ensembleSize, writeQuorumSize, entryId);
     }
 
+    @Override
+    public int getWriteSetBookieIndex(long entryId, int writeSetIndex) {
+        return (int) (entryId + writeSetIndex) % ensembleSize;
+    }
+
     @Override
     public WriteSet getEnsembleSet(long entryId) {
         // for long poll reads and force ledger , we are trying all the bookies in the ensemble
@@ -418,12 +425,13 @@ public class RoundRobinDistributionSchedule implements DistributionSchedule {
 
     @Override
     public boolean hasEntry(long entryId, int bookieIndex) {
-        WriteSet w = getWriteSet(entryId);
-        try {
-            return w.contains(bookieIndex);
-        } finally {
-            w.recycle();
+        for (int w = 0; w < writeQuorumSize; w++) {
+            if (bookieIndex == getWriteSetBookieIndex(entryId, w)) {
+                return true;
+            }
         }
+
+        return false;
     }
 
     @Override