You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 12:07:40 UTC

[pulsar] branch branch-2.8 updated (7bd69f9 -> 8fc3667)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 7bd69f9  Catch exceptions in scheduled tasks to prevent unintended cancellation (#12853)
     new 8873a2b  [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (#12606)
     new 8fc3667  [Perf] Evaluate the current protocol version once (#13045)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java   | 20 +++++++++++++-------
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java   | 18 +++++++++---------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java   |  2 +-
 .../org/apache/pulsar/common/protocol/Commands.java  |  7 +++++--
 4 files changed, 28 insertions(+), 19 deletions(-)

[pulsar] 01/02: [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (#12606)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8873a2b8e9d06de458dc6412f3b0a0483b0bf5f9
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Nov 5 22:12:47 2021 +0200

    [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (#12606)
    
    * [ML] Avoid passing OpAddEntry across a thread boundary
    
    * Retain buffer in current thread
    
    (cherry picked from commit 6af747f515677796bba343997b2269ffd27cb601)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java   | 20 +++++++++++++-------
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java   | 18 +++++++++---------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java   |  2 +-
 3 files changed, 23 insertions(+), 17 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b4387e6..4cbb75f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -699,10 +699,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
         }
 
-        OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx);
+        // retain buffer in this thread
+        buffer.retain();
 
         // Jump to specific thread to avoid contention from writers writing from different threads
-        executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
+        executor.executeOrdered(name, safeRun(() -> {
+            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
+            internalAsyncAddEntry(addOperation);
+        }));
     }
 
     @Override
@@ -711,10 +715,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
         }
 
-        OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx);
+        // retain buffer in this thread
+        buffer.retain();
 
         // Jump to specific thread to avoid contention from writers writing from different threads
-        executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
+        executor.executeOrdered(name, safeRun(() -> {
+            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
+            internalAsyncAddEntry(addOperation);
+        }));
     }
 
     private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
@@ -1508,9 +1516,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 // If op is used by another ledger handle, we need to close it and create a new one
                 if (existsOp.ledger != null) {
                     existsOp.close();
-                    existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
-                    // release the extra retain
-                    ReferenceCountUtil.release(existsOp.data);
+                    existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
                 }
                 existsOp.setLedger(currentLedger);
                 pendingAddEntries.add(existsOp);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 72dccca..0b96b59 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -75,16 +75,16 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
         CLOSED
     }
 
-    public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
-        OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
+    public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
+        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
         }
         return op;
     }
 
-    public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
-        OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
+    public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
+        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
         op.numberOfMessages = numberOfMessages;
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
@@ -92,11 +92,11 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
         return op;
     }
 
-    private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
+    private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
         OpAddEntry op = RECYCLER.get();
         op.ml = ml;
         op.ledger = null;
-        op.data = data.retain();
+        op.data = data;
         op.dataLength = data.readableBytes();
         op.callback = callback;
         op.ctx = ctx;
@@ -154,7 +154,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
         }
         checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
                 lh.getId());
-        
+
         if (!checkAndCompleteOp(ctx)) {
             // means callback might have been completed by different thread (timeout task thread).. so do nothing
             return;
@@ -254,7 +254,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
 
     /**
      * Checks if add-operation is completed
-     * 
+     *
      * @return true if task is not already completed else returns false.
      */
     private boolean checkAndCompleteOp(Object ctx) {
@@ -275,7 +275,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
 
     /**
      * It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
-     * 
+     *
      * @param lh
      */
     void handleAddFailure(final LedgerHandle lh) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index eda4848..46029eb 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2774,7 +2774,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         List<OpAddEntry> oldOps = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            OpAddEntry op = OpAddEntry.create(ledger, ByteBufAllocator.DEFAULT.buffer(128), null, null);
+            OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
             if (i > 4) {
                 op.setLedger(mock(LedgerHandle.class));
             }

[pulsar] 02/02: [Perf] Evaluate the current protocol version once (#13045)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8fc36677fe3eb2344e6c42467ee393e4bab149fa
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Nov 30 15:10:39 2021 +0200

    [Perf] Evaluate the current protocol version once (#13045)
    
    - there's no need to dynamically evaluate it on every call
    
    (cherry picked from commit 724523f3051def9577d6bd27697866c99f4a7b0e)
---
 .../src/main/java/org/apache/pulsar/common/protocol/Commands.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 241ae87..01a7b01 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -124,6 +124,10 @@ public class Commands {
         }
     };
 
+    // Return the last ProtocolVersion enum value
+    private static final int CURRENT_PROTOCOL_VERSION =
+            ProtocolVersion.values()[ProtocolVersion.values().length - 1].getValue();
+
     private static BaseCommand localCmd(BaseCommand.Type type) {
         return LOCAL_BASE_COMMAND.get()
                 .clear()
@@ -1705,8 +1709,7 @@ public class Commands {
     }
 
     public static int getCurrentProtocolVersion() {
-        // Return the last ProtocolVersion enum value
-        return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getValue();
+        return CURRENT_PROTOCOL_VERSION;
     }
 
     /**