You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/30 17:31:39 UTC

[pulsar] branch master updated: Don't attempt to delete pending ack store unless transactions are enabled (#13041)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4672024  Don't attempt to delete pending ack store unless transactions are enabled (#13041)
4672024 is described below

commit 46720247d9a06daae9f8eae7740887c92406b2c3
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Nov 30 19:28:58 2021 +0200

    Don't attempt to delete pending ack store unless transactions are enabled (#13041)
---
 .../broker/service/persistent/PersistentTopic.java | 43 ++++++++++++----------
 1 file changed, 24 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 25cf921..662b2b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1011,27 +1011,32 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     @Override
     public CompletableFuture<Void> unsubscribe(String subscriptionName) {
         CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
-        getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
-                .getTransactionPendingAckStoreSuffix(topic,
-                        Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
-                new AsyncCallbacks.DeleteLedgerCallback() {
-            @Override
-            public void deleteLedgerComplete(Object ctx) {
-                asyncDeleteCursor(subscriptionName, unsubscribeFuture);
-            }
 
-            @Override
-            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                if (exception instanceof MetadataNotFoundException) {
-                    asyncDeleteCursor(subscriptionName, unsubscribeFuture);
-                    return;
-                }
+        if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
+            getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
+                            .getTransactionPendingAckStoreSuffix(topic,
+                                    Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
+                    new AsyncCallbacks.DeleteLedgerCallback() {
+                        @Override
+                        public void deleteLedgerComplete(Object ctx) {
+                            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                        }
 
-                unsubscribeFuture.completeExceptionally(exception);
-                log.error("[{}][{}] Error deleting subscription pending ack store",
-                        topic, subscriptionName, exception);
-            }
-        }, null);
+                        @Override
+                        public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                            if (exception instanceof MetadataNotFoundException) {
+                                asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                                return;
+                            }
+
+                            unsubscribeFuture.completeExceptionally(exception);
+                            log.error("[{}][{}] Error deleting subscription pending ack store",
+                                    topic, subscriptionName, exception);
+                        }
+                    }, null);
+        } else {
+            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+        }
 
         return unsubscribeFuture;
     }