You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/03/09 03:02:15 UTC

[pulsar] branch branch-2.11 updated: [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 9f58da745ea [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692)
9f58da745ea is described below

commit 9f58da745ead56be6f61c2f2f656f5a1616bcd94
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Mar 9 10:38:21 2023 +0800

    [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692)
    
    Motivation : After a Topic close failure or a delete failure, the fence state will be reset to get the topic back to work,but it will not reset the fence state of the subscription, which will result in the consumer never being able to connect to the broker.
    Modifications: Reset the fence state of subscriptions if the operation of topic close is failed.
    (cherry picked from commit cdeef00c5f6a5bd3197b4ca6de0a0505b18835d8)
---
 .../service/persistent/PersistentSubscription.java | 39 +++++++++++----
 .../broker/service/persistent/PersistentTopic.java |  1 +
 .../service/persistent/PersistentTopicTest.java    | 55 ++++++++++++++++++++++
 3 files changed, 87 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 53701649289..645f7402c7e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -125,6 +125,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
     private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
     private final PendingAckHandle pendingAckHandle;
     private volatile Map<String, String> subscriptionProperties;
+    private volatile CompletableFuture<Void> fenceFuture;
 
     static {
         REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
@@ -891,7 +892,10 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
      */
     @Override
     public synchronized CompletableFuture<Void> disconnect() {
-        CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
+        if (fenceFuture != null){
+            return fenceFuture;
+        }
+        fenceFuture = new CompletableFuture<>();
 
         // block any further consumers on this subscription
         IS_FENCED_UPDATER.set(this, TRUE);
@@ -899,19 +903,38 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
         (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null))
                 .thenCompose(v -> close()).thenRun(() -> {
                     log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName);
-                    disconnectFuture.complete(null);
+                    fenceFuture.complete(null);
                 }).exceptionally(exception -> {
-                    IS_FENCED_UPDATER.set(this, FALSE);
-                    if (dispatcher != null) {
-                        dispatcher.reset();
-                    }
                     log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
                             exception);
-                    disconnectFuture.completeExceptionally(exception);
+                    fenceFuture.completeExceptionally(exception);
+                    resumeAfterFence();
                     return null;
                 });
+        return fenceFuture;
+    }
 
-        return disconnectFuture;
+    /**
+     * Resume subscription after topic deletion or close failure.
+     */
+    public synchronized void resumeAfterFence() {
+        // If "fenceFuture" is null, it means that "disconnect" has never been called.
+        if (fenceFuture != null) {
+            fenceFuture.whenComplete((ignore, ignoreEx) -> {
+                synchronized (PersistentSubscription.this) {
+                    try {
+                        if (IS_FENCED_UPDATER.compareAndSet(this, TRUE, FALSE)) {
+                            if (dispatcher != null) {
+                                dispatcher.reset();
+                            }
+                        }
+                        fenceFuture = null;
+                    } catch (Exception ex) {
+                        log.error("[{}] Resume subscription [{}] failure", topicName, subName, ex);
+                    }
+                }
+            });
+        }
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 61385e09d0d..4c7b7bb979c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2981,6 +2981,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     private void unfenceTopicToResume() {
+        subscriptions.values().forEach(sub -> sub.resumeAfterFence());
         isFenced = false;
         isClosingOrDeleting = false;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index a62e8762ac5..7a4172ca12c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -19,7 +19,10 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -36,13 +39,17 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
@@ -347,4 +354,52 @@ public class PersistentTopicTest extends BrokerTestBase {
             Assert.assertEquals(topicLevelNum, 0);
         }
     }
+
+    @Test
+    public void testDeleteTopicFail() throws Exception {
+        final String fullyTopicName = "persistent://prop/ns-abc/" + "tp_"
+                + UUID.randomUUID().toString().replaceAll("-", "");
+        // Mock topic.
+        BrokerService brokerService = spy(pulsar.getBrokerService());
+        doReturn(brokerService).when(pulsar).getBrokerService();
+
+        // Create a sub, and send one message.
+        Consumer consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(fullyTopicName).subscriptionName("sub1")
+                .subscribe();
+        consumer1.close();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(fullyTopicName).create();
+        producer.send("1");
+        producer.close();
+
+        // Make a failed delete operation.
+        AtomicBoolean makeDeletedFailed = new AtomicBoolean(true);
+        PersistentTopic persistentTopic = (PersistentTopic) brokerService.getTopic(fullyTopicName, false).get().get();
+        doAnswer(invocation -> {
+            CompletableFuture future = (CompletableFuture) invocation.getArguments()[1];
+            if (makeDeletedFailed.get()) {
+                future.completeExceptionally(new RuntimeException("mock ex for test"));
+            } else {
+                future.complete(null);
+            }
+            return null;
+        }).when(brokerService)
+                .deleteTopicAuthenticationWithRetry(any(String.class), any(CompletableFuture.class), anyInt());
+        try {
+            persistentTopic.delete().get();
+        } catch (Exception e) {
+            org.testng.Assert.assertTrue(e instanceof ExecutionException);
+            org.testng.Assert.assertTrue(e.getCause() instanceof java.lang.RuntimeException);
+            org.testng.Assert.assertEquals(e.getCause().getMessage(), "mock ex for test");
+        }
+
+        // Assert topic works after deleting failure.
+        Consumer consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(fullyTopicName).subscriptionName("sub1")
+                .subscribe();
+        org.testng.Assert.assertEquals("1", consumer2.receive(2, TimeUnit.SECONDS).getValue());
+        consumer2.close();
+
+        // Make delete success.
+        makeDeletedFailed.set(false);
+        persistentTopic.delete().get();
+    }
 }