You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/30 17:31:39 UTC
[pulsar] branch master updated: Don't attempt to delete pending ack store unless transactions are enabled (#13041)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4672024 Don't attempt to delete pending ack store unless transactions are enabled (#13041)
4672024 is described below
commit 46720247d9a06daae9f8eae7740887c92406b2c3
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Nov 30 19:28:58 2021 +0200
Don't attempt to delete pending ack store unless transactions are enabled (#13041)
---
.../broker/service/persistent/PersistentTopic.java | 43 ++++++++++++----------
1 file changed, 24 insertions(+), 19 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 25cf921..662b2b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1011,27 +1011,32 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@Override
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
- getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
- .getTransactionPendingAckStoreSuffix(topic,
- Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
- new AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
- asyncDeleteCursor(subscriptionName, unsubscribeFuture);
- }
- @Override
- public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
- if (exception instanceof MetadataNotFoundException) {
- asyncDeleteCursor(subscriptionName, unsubscribeFuture);
- return;
- }
+ if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
+ getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
+ .getTransactionPendingAckStoreSuffix(topic,
+ Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
+ new AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+ asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ }
- unsubscribeFuture.completeExceptionally(exception);
- log.error("[{}][{}] Error deleting subscription pending ack store",
- topic, subscriptionName, exception);
- }
- }, null);
+ @Override
+ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ if (exception instanceof MetadataNotFoundException) {
+ asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ return;
+ }
+
+ unsubscribeFuture.completeExceptionally(exception);
+ log.error("[{}][{}] Error deleting subscription pending ack store",
+ topic, subscriptionName, exception);
+ }
+ }, null);
+ } else {
+ asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ }
return unsubscribeFuture;
}