You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/30 00:38:22 UTC

[pulsar] branch branch-2.9 updated (f27701a -> 970c118)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from f27701a  TieredStorage: add debug information (#14907)
     new b659cdc  [fix][broker] Fix wrong state for non-durable cursor (#14869)
     new 5ea98b4  [fix][broker] Fix topic policy reader close bug. (#14897)
     new a993855  [fix][transaction] Properly close transaction-buffer-sub non durable cursor (#14900)
     new 970c118  # Motivation (#14895)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   2 +-
 .../mledger/impl/NonDurableCursorImpl.java         |   4 +-
 .../SystemTopicBasedTopicPoliciesService.java      |   4 +-
 .../broker/service/persistent/PersistentTopic.java |   7 +-
 .../buffer/impl/TopicTransactionBuffer.java        |  13 ++-
 .../systopic/PartitionedSystemTopicTest.java       |   5 +
 .../pulsar/broker/transaction/TransactionTest.java |   7 +-
 .../buffer/TransactionBufferCloseTest.java         | 120 +++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  21 +++-
 9 files changed, 170 insertions(+), 13 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java

[pulsar] 01/04: [fix][broker] Fix wrong state for non-durable cursor (#14869)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b659cdce4871b1553b134e2e7e668e9dc1513318
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Mon Mar 28 16:43:29 2022 +0800

    [fix][broker] Fix wrong state for non-durable cursor (#14869)
    
    ### Motivation
    
    The current non-durable cursor does not have the correct state.
    For example, when the reader is created, I always see the cursor status as ``Uninitialized`` via the ``getInternalStats`` method.
    
    ```json
    {
      "reader-xxxxx" : {
          "markDeletePosition" : "19785:18718",
          "readPosition" : "19807:42735",
          "waitingReadOp" : false,
          "pendingReadOps" : 0,
          "messagesConsumedCounter" : -2257,
          "cursorLedger" : -1,
          "cursorLedgerLastEntry" : -1,
          "individuallyDeletedMessages" : "[]",
          "lastLedgerSwitchTimestamp" : "2022-03-24T20:03:51.85Z",
          "state" : "Uninitialized",
          "numberOfEntriesSinceFirstNotAckedMessage" : 744993,
          "totalNonContiguousDeletedMessagesRange" : 0,
          "subscriptionHavePendingRead" : true,
          "subscriptionHavePendingReplayRead" : false,
          "properties" : { }
        }
    }
    ```
    
    ### Modifications
    
    - Correct the cursor state.
    
    (cherry picked from commit b477557a6f6d36ae18f09d4edfa076e9092a17fe)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java  |  2 +-
 .../mledger/impl/NonDurableCursorImpl.java          |  4 ++--
 .../org/apache/pulsar/client/impl/ReaderTest.java   | 21 ++++++++++++++++++++-
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8c96f0e..f882444 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -236,7 +236,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         Closed // The managed cursor has been closed
     }
 
-    private static final AtomicReferenceFieldUpdater<ManagedCursorImpl, State> STATE_UPDATER =
+    protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, State> STATE_UPDATER =
         AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, State.class, "state");
     protected volatile State state = null;
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 1d545bd..4625f5b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -61,7 +61,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
             // read-position
             recoverCursor(startCursorPosition);
         }
-
+        STATE_UPDATER.set(this, State.Open);
         log.info("[{}] Created non-durable cursor read-position={} mark-delete-position={}", ledger.getName(),
                 readPosition, markDeletePosition);
     }
@@ -110,7 +110,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
 
     @Override
     public void asyncClose(CloseCallback callback, Object ctx) {
-        // No-Op
+        STATE_UPDATER.set(this, State.Closed);
         callback.closeComplete(ctx);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 2052646..53ae8f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -55,6 +54,8 @@ import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -602,4 +603,22 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
         });
     }
 
+    @Test
+    public void testReaderCursorStatsCorrect() throws Exception {
+        final String readerNotAckTopic = "persistent://my-property/my-ns/testReaderCursorStatsCorrect";
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(readerNotAckTopic)
+                .startMessageId(MessageId.earliest)
+                .create();
+        PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(readerNotAckTopic);
+        Assert.assertEquals(internalStats.cursors.size(), 1);
+        String key = new ArrayList<>(internalStats.cursors.keySet()).get(0);
+        ManagedLedgerInternalStats.CursorStats cursor = internalStats.cursors.get(key);
+        Assert.assertEquals(cursor.state, "Open");
+        reader.close();
+        internalStats = admin.topics().getInternalStats(readerNotAckTopic);
+        Assert.assertEquals(internalStats.cursors.size(), 0);
+    }
+
 }

[pulsar] 02/04: [fix][broker] Fix topic policy reader close bug. (#14897)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ea98b46c631db5928c956eae5481a0ea1dab22a
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Mar 27 22:07:03 2022 +0800

    [fix][broker] Fix topic policy reader close bug. (#14897)
    
    ### Motivation
    
    https://github.com/apache/pulsar/issues/14896 is flaky, after diving into the codes, I find it's a bug about closing topic policy reader.  We should use `ex.getCause` instead of `ex`.
    
    Stacktraceļ¼š
    ```
    2022-03-27T10:42:16,795+0800 [broker-client-shared-internal-executor-58-1] WARN  org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService - Read more topic polices exception, read again.
    java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: Consumer already closed
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_291]
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_291]
    	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) ~[?:1.8.0_291]
    	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) ~[?:1.8.0_291]
    	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) ~[?:1.8.0_291]
    	at org.apache.pulsar.client.impl.MultiTopicsReaderImpl.readNextAsync(MultiTopicsReaderImpl.java:140) ~[classes/:?]
    
    ```
    
    (cherry picked from commit 85c1ba50cb155111f969403a092affd536d9dcb9)
---
 .../pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java  | 4 +++-
 .../apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java    | 5 +++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 964bdde..1c01814 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -364,7 +365,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 notifyListener(msg);
                 readMorePolicies(reader);
             } else {
-                if (ex instanceof PulsarClientException.AlreadyClosedException) {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
                     log.error("Read more topic policies exception, close the read now!", ex);
                     cleanCacheAndCloseReader(
                             reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 7dd02bd..bbd3cae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -74,6 +74,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         Assert.assertEquals(admin.topics().getPartitionedTopicList(ns).size(), 1);
         Assert.assertEquals(partitions, PARTITIONS);
         Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS);
+        reader.close();
     }
 
     @Test(timeOut = 1000 * 60)
@@ -97,6 +98,10 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
                     .subscribeAsync());
         }
         FutureUtil.waitForAll(futureList).get();
+        // Close all the consumers after check
+        for (CompletableFuture<Consumer<byte[]>> consumer : futureList) {
+            consumer.join().close();
+        }
     }
 
 }

[pulsar] 04/04: # Motivation (#14895)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 970c11896e48de42f16987036486be49fce132ca
Author: ran <ga...@126.com>
AuthorDate: Wed Mar 30 06:30:38 2022 +0800

    # Motivation (#14895)
    
    Currently, the transaction buffer don't be closed when deleting topic.
    
    # Modification
    
    Close the transaction buffer when deleting topic.
    
    (cherry picked from commit e8d52fdb14dca62e7f0d7eb933bb7dc7fb3e916e)
---
 .../broker/service/persistent/PersistentTopic.java |   7 +-
 .../buffer/TransactionBufferCloseTest.java         | 120 +++++++++++++++++++++
 2 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7052fc5..4f1b8cc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1155,7 +1155,8 @@ public class PersistentTopic extends AbstractTopic
                     deleteTopicAuthenticationFuture.thenCompose(
                                     __ -> deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null))
                             .thenAccept(__ -> deleteTopicPolicies())
-                            .thenCompose(__ -> transactionBuffer.clearSnapshot()).whenComplete((v, ex) -> {
+                            .thenCompose(__ -> transactionBufferCleanupAndClose())
+                            .whenComplete((v, ex) -> {
                         if (ex != null) {
                             log.error("[{}] Error deleting topic", topic, ex);
                             unfenceTopicToResume();
@@ -3317,6 +3318,10 @@ public class PersistentTopic extends AbstractTopic
         return subscription.getPendingAckManageLedger();
     }
 
+    private CompletableFuture<Void> transactionBufferCleanupAndClose() {
+        return transactionBuffer.clearSnapshot().thenCompose(__ -> transactionBuffer.closeAsync());
+    }
+
     public long getLastDataMessagePublishedTimestamp() {
         return lastDataMessagePublishedTimestamp;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
new file mode 100644
index 0000000..43d31e7
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Transaction buffer close test.
+ */
+@Slf4j
+@Test(groups = "broker")
+public class TransactionBufferCloseTest extends TransactionTestBase {
+
+    @BeforeMethod
+    protected void setup() throws Exception {
+        setUpBase(1, 16, null, 0);
+        Awaitility.await().until(() -> ((PulsarClientImpl) pulsarClient)
+                .getTcClient().getState() == TransactionCoordinatorClient.State.READY);
+        admin.tenants().createTenant(TENANT,
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "isPartition")
+    public Object[][] isPartition() {
+        return new Object[][]{
+                { true }, { false }
+        };
+    }
+
+    @Test(timeOut = 10_000, dataProvider = "isPartition")
+    public void deleteTopicCloseTransactionBufferTest(boolean isPartition) throws Exception {
+        int expectedCount = isPartition ? 30 : 1;
+        TopicName topicName = createAndLoadTopic(isPartition, expectedCount);
+        checkSnapshotPublisherCount(topicName.getNamespace(), expectedCount);
+        if (isPartition) {
+            admin.topics().deletePartitionedTopic(topicName.getPartitionedTopicName(), true);
+        } else {
+            admin.topics().delete(topicName.getPartitionedTopicName(), true);
+        }
+        checkSnapshotPublisherCount(topicName.getNamespace(), 0);
+    }
+
+    @Test(timeOut = 10_000, dataProvider = "isPartition")
+    public void unloadTopicCloseTransactionBufferTest(boolean isPartition) throws Exception {
+        int expectedCount = isPartition ? 30 : 1;
+        TopicName topicName = createAndLoadTopic(isPartition, expectedCount);
+        checkSnapshotPublisherCount(topicName.getNamespace(), expectedCount);
+        admin.topics().unload(topicName.getPartitionedTopicName());
+        checkSnapshotPublisherCount(topicName.getNamespace(), 0);
+    }
+
+    private TopicName createAndLoadTopic(boolean isPartition, int partitionCount)
+            throws PulsarAdminException, PulsarClientException {
+        String namespace = TENANT + "/ns-" + RandomStringUtils.randomAlphabetic(5);
+        admin.namespaces().createNamespace(namespace, 3);
+        String topic = namespace + "/tb-close-test-";
+        if (isPartition) {
+            admin.topics().createPartitionedTopic(topic, partitionCount);
+        }
+        pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create()
+                .close();
+        return TopicName.get(topic);
+    }
+
+    private void checkSnapshotPublisherCount(String namespace, int expectCount) throws PulsarAdminException {
+        TopicName snTopicName = TopicName.get(TopicDomain.persistent.value(), NamespaceName.get(namespace),
+                EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+        List<PublisherStats> publisherStatsList =
+                (List<PublisherStats>) admin.topics()
+                        .getStats(snTopicName.getPartitionedTopicName()).getPublishers();
+        Assert.assertEquals(publisherStatsList.size(), expectCount);
+    }
+
+}

[pulsar] 03/04: [fix][transaction] Properly close transaction-buffer-sub non durable cursor (#14900)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a9938558a8a09414d9adadc520f4c797c750c8a7
Author: gaozhangmin <ga...@gmail.com>
AuthorDate: Tue Mar 29 01:00:20 2022 +0800

    [fix][transaction] Properly close transaction-buffer-sub non durable cursor (#14900)
    
    Fixes #14880
    
    ### Motivation
    
    Non durable cursor was not closed properly.
    
    ### Modifications
    For non durable cursor,  `cursor.asyncClose` did nothing. The proper way is `topic.getManagedLedger().asyncDeleteCursor`
    
    (cherry picked from commit 4e62ffc15714cfa49ed441f3ba7ededb866b9062)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java     | 13 ++++++++-----
 .../apache/pulsar/broker/transaction/TransactionTest.java   |  7 +++++--
 2 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 66ce8f5..e2888d9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.Codec;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.SpscArrayQueue;
 
@@ -639,7 +640,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                                 }
                             }
 
-                            closeCursor(managedCursor);
+                            closeCursor(SUBSCRIPTION_NAME);
                             callBack.recoverComplete();
                         }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
                                 .getExecutor(this)).exceptionally(e -> {
@@ -656,17 +657,19 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
             });
         }
 
-        private void closeCursor(ManagedCursor cursor) {
-            cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
+        private void closeCursor(String subscriptionName) {
+            topic.getManagedLedger().asyncDeleteCursor(Codec.encode(subscriptionName),
+                    new AsyncCallbacks.DeleteCursorCallback() {
                 @Override
-                public void closeComplete(Object ctx) {
+                public void deleteCursorComplete(Object ctx) {
                     log.info("[{}]Transaction buffer snapshot recover cursor close complete.", topic.getName());
                 }
 
                 @Override
-                public void closeFailed(ManagedLedgerException exception, Object ctx) {
+                public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                     log.error("[{}]Transaction buffer snapshot recover cursor close fail.", topic.getName());
                 }
+
             }, null);
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 231c183..ae5e8c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -538,7 +538,7 @@ public class TransactionTest extends TransactionTestBase {
                 .getTopic("persistent://" + topic, false).get().get();
         persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
 
-        ManagedCursor managedCursor = mock(ManagedCursor.class);
+        ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
         doReturn("transaction-buffer-sub").when(managedCursor).getName();
         doReturn(true).when(managedCursor).hasMoreEntries();
         doAnswer(invocation -> {
@@ -579,6 +579,9 @@ public class TransactionTest extends TransactionTestBase {
         TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
                 assertEquals(buffer3.getStats().state, "Ready"));
+        persistentTopic.getInternalStats(false).thenAccept(internalStats -> {
+            assertTrue(internalStats.cursors.isEmpty());
+        });
         managedCursors.removeCursor("transaction-buffer-sub");
     }
 
@@ -893,4 +896,4 @@ public class TransactionTest extends TransactionTestBase {
         pulsarServiceList.forEach((pulsarService ->
                 pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true)));
     }
-}
\ No newline at end of file
+}