You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 12:19:45 UTC

[pulsar] 02/05: [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (#12606)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1fea2637973f2baed542629ce9a0e1c388a255d2
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Nov 5 22:12:47 2021 +0200

    [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (#12606)
    
    * [ML] Avoid passing OpAddEntry across a thread boundary
    
    * Retain buffer in current thread
    
    (cherry picked from commit 6af747f515677796bba343997b2269ffd27cb601)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java   | 20 +++++++++++++-------
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java   | 18 +++++++++---------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java   |  2 +-
 3 files changed, 23 insertions(+), 17 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0bc88c5..032e53e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -699,10 +699,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
         }
 
-        OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx);
+        // retain buffer in this thread
+        buffer.retain();
 
         // Jump to specific thread to avoid contention from writers writing from different threads
-        executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
+        executor.executeOrdered(name, safeRun(() -> {
+            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
+            internalAsyncAddEntry(addOperation);
+        }));
     }
 
     @Override
@@ -711,10 +715,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
         }
 
-        OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx);
+        // retain buffer in this thread
+        buffer.retain();
 
         // Jump to specific thread to avoid contention from writers writing from different threads
-        executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
+        executor.executeOrdered(name, safeRun(() -> {
+            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
+            internalAsyncAddEntry(addOperation);
+        }));
     }
 
     private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
@@ -1509,9 +1517,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 // If op is used by another ledger handle, we need to close it and create a new one
                 if (existsOp.ledger != null) {
                     existsOp.close();
-                    existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
-                    // release the extra retain
-                    ReferenceCountUtil.release(existsOp.data);
+                    existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
                 }
                 existsOp.setLedger(currentLedger);
                 pendingAddEntries.add(existsOp);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 9106b4f..e08b204 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -75,16 +75,16 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
         CLOSED
     }
 
-    public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
-        OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
+    public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
+        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
         }
         return op;
     }
 
-    public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
-        OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
+    public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
+        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
         op.numberOfMessages = numberOfMessages;
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
@@ -92,11 +92,11 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
         return op;
     }
 
-    private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
+    private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
         OpAddEntry op = RECYCLER.get();
         op.ml = ml;
         op.ledger = null;
-        op.data = data.retain();
+        op.data = data;
         op.dataLength = data.readableBytes();
         op.callback = callback;
         op.ctx = ctx;
@@ -155,7 +155,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
         }
         checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
                 lh.getId());
-        
+
         if (!checkAndCompleteOp(ctx)) {
             // means callback might have been completed by different thread (timeout task thread).. so do nothing
             return;
@@ -255,7 +255,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
 
     /**
      * Checks if add-operation is completed
-     * 
+     *
      * @return true if task is not already completed else returns false.
      */
     private boolean checkAndCompleteOp(Object ctx) {
@@ -276,7 +276,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
 
     /**
      * It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
-     * 
+     *
      * @param lh
      */
     void handleAddFailure(final LedgerHandle lh) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4358c2f..8ce8895 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2829,7 +2829,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         List<OpAddEntry> oldOps = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            OpAddEntry op = OpAddEntry.create(ledger, ByteBufAllocator.DEFAULT.buffer(128), null, null);
+            OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
             if (i > 4) {
                 op.setLedger(mock(LedgerHandle.class));
             }