You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/30 00:38:25 UTC

[pulsar] 03/04: [fix][transaction] Properly close transaction-buffer-sub non durable cursor (#14900)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a9938558a8a09414d9adadc520f4c797c750c8a7
Author: gaozhangmin <ga...@gmail.com>
AuthorDate: Tue Mar 29 01:00:20 2022 +0800

    [fix][transaction] Properly close transaction-buffer-sub non durable cursor (#14900)
    
    Fixes #14880
    
    ### Motivation
    
    Non durable cursor was not closed properly.
    
    ### Modifications
    For non durable cursor,  `cursor.asyncClose` did nothing. The proper way is `topic.getManagedLedger().asyncDeleteCursor`
    
    (cherry picked from commit 4e62ffc15714cfa49ed441f3ba7ededb866b9062)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java     | 13 ++++++++-----
 .../apache/pulsar/broker/transaction/TransactionTest.java   |  7 +++++--
 2 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 66ce8f5..e2888d9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.Codec;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.SpscArrayQueue;
 
@@ -639,7 +640,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                                 }
                             }
 
-                            closeCursor(managedCursor);
+                            closeCursor(SUBSCRIPTION_NAME);
                             callBack.recoverComplete();
                         }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
                                 .getExecutor(this)).exceptionally(e -> {
@@ -656,17 +657,19 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
             });
         }
 
-        private void closeCursor(ManagedCursor cursor) {
-            cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
+        private void closeCursor(String subscriptionName) {
+            topic.getManagedLedger().asyncDeleteCursor(Codec.encode(subscriptionName),
+                    new AsyncCallbacks.DeleteCursorCallback() {
                 @Override
-                public void closeComplete(Object ctx) {
+                public void deleteCursorComplete(Object ctx) {
                     log.info("[{}]Transaction buffer snapshot recover cursor close complete.", topic.getName());
                 }
 
                 @Override
-                public void closeFailed(ManagedLedgerException exception, Object ctx) {
+                public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                     log.error("[{}]Transaction buffer snapshot recover cursor close fail.", topic.getName());
                 }
+
             }, null);
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 231c183..ae5e8c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -538,7 +538,7 @@ public class TransactionTest extends TransactionTestBase {
                 .getTopic("persistent://" + topic, false).get().get();
         persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
 
-        ManagedCursor managedCursor = mock(ManagedCursor.class);
+        ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
         doReturn("transaction-buffer-sub").when(managedCursor).getName();
         doReturn(true).when(managedCursor).hasMoreEntries();
         doAnswer(invocation -> {
@@ -579,6 +579,9 @@ public class TransactionTest extends TransactionTestBase {
         TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
                 assertEquals(buffer3.getStats().state, "Ready"));
+        persistentTopic.getInternalStats(false).thenAccept(internalStats -> {
+            assertTrue(internalStats.cursors.isEmpty());
+        });
         managedCursors.removeCursor("transaction-buffer-sub");
     }
 
@@ -893,4 +896,4 @@ public class TransactionTest extends TransactionTestBase {
         pulsarServiceList.forEach((pulsarService ->
                 pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true)));
     }
-}
\ No newline at end of file
+}