You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 02:18:34 UTC

[pulsar] 02/16: Don't attempt to delete pending ack store unless transactions are enabled (#13041)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dfcdacf40ecacec45e56a00d577866b014f7fa67
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Nov 30 19:28:58 2021 +0200

    Don't attempt to delete pending ack store unless transactions are enabled (#13041)
    
    (cherry picked from commit 46720247d9a06daae9f8eae7740887c92406b2c3)
---
 .../broker/service/persistent/PersistentTopic.java | 43 ++++++++++++----------
 1 file changed, 24 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 46c9dfd..053d72d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -979,27 +979,32 @@ public class PersistentTopic extends AbstractTopic
     @Override
     public CompletableFuture<Void> unsubscribe(String subscriptionName) {
         CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
-        getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
-                .getTransactionPendingAckStoreSuffix(topic,
-                        Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
-                new AsyncCallbacks.DeleteLedgerCallback() {
-            @Override
-            public void deleteLedgerComplete(Object ctx) {
-                asyncDeleteCursor(subscriptionName, unsubscribeFuture);
-            }
 
-            @Override
-            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                if (exception instanceof MetadataNotFoundException) {
-                    asyncDeleteCursor(subscriptionName, unsubscribeFuture);
-                    return;
-                }
+        if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
+            getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
+                            .getTransactionPendingAckStoreSuffix(topic,
+                                    Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
+                    new AsyncCallbacks.DeleteLedgerCallback() {
+                        @Override
+                        public void deleteLedgerComplete(Object ctx) {
+                            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                        }
 
-                unsubscribeFuture.completeExceptionally(exception);
-                log.error("[{}][{}] Error deleting subscription pending ack store",
-                        topic, subscriptionName, exception);
-            }
-        }, null);
+                        @Override
+                        public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                            if (exception instanceof MetadataNotFoundException) {
+                                asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                                return;
+                            }
+
+                            unsubscribeFuture.completeExceptionally(exception);
+                            log.error("[{}][{}] Error deleting subscription pending ack store",
+                                    topic, subscriptionName, exception);
+                        }
+                    }, null);
+        } else {
+            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+        }
 
         return unsubscribeFuture;
     }