You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/04/21 03:59:43 UTC

[pulsar] branch branch-2.10 updated: Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 58323b01b6b Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)
58323b01b6b is described below

commit 58323b01b6b074b2335f580827281fef877d0f8e
Author: Shen Liu <li...@126.com>
AuthorDate: Thu Apr 21 11:54:34 2022 +0800

    Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)
    
    Co-authored-by: druidliu <dr...@tencent.com>
    
    Fixes #15196.
    
    ### Motivation
    
    If broker having conf `topicFencingTimeoutSeconds`>0, a topic is trigged closed and closed normally, `closeFencedTopicForcefully` should not be called.
    
    ### Modifications
    
    Cancel fenced topic monitoring task if topic close normally, which cancel running `closeFencedTopicForcefully`.
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change added tests and can be verified as follows:
      - Add `org.apache.pulsar.broker.service.PersistentTopicTest#testTopicCloseFencingTimeout`
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (yes / no)
      - The public API: (yes / no)
      - The schema: (yes / no / don't know)
      - The default values of configurations: (yes / no)
      - The wire protocol: (yes / no)
      - The rest endpoints: (yes / no)
      - The admin cli options: (yes / no)
      - Anything that affects deployment: (yes / no / don't know)
    
    ### Documentation
    
    Check the box below or label this PR directly.
    
    Need to update docs?
    
    - [ ] `doc-required`
    - [x] `no-need-doc`
    - [ ] `doc`
    - [ ] `doc-added`
    
    (cherry picked from commit e4a8de1eb9605e36060f0740a0097203a21e34ef)
---
 .../broker/service/persistent/PersistentTopic.java | 13 +++++++++----
 .../pulsar/broker/service/PersistentTopicTest.java | 22 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3f7aafb70d4..e30049a098d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1284,6 +1284,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
                                 unregisterTopicPolicyListener();
                                 log.info("[{}] Topic closed", topic);
+                                cancelFencedTopicMonitoringTask();
                                 closeFuture.complete(null);
                             })
                     .exceptionally(ex -> {
@@ -2874,6 +2875,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return true;
     }
 
+    private synchronized void cancelFencedTopicMonitoringTask() {
+        ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
+        if (monitoringTask != null && !monitoringTask.isDone()) {
+            monitoringTask.cancel(false);
+        }
+    }
+
     private synchronized void fence() {
         isFenced = true;
         ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
@@ -2888,10 +2896,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
     private synchronized void unfence() {
         isFenced = false;
-        ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
-        if (monitoringTask != null && !monitoringTask.isDone()) {
-            monitoringTask.cancel(false);
-        }
+        cancelFencedTopicMonitoringTask();
     }
 
     private void closeFencedTopicForcefully() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index fa333bcfd9f..4e53819e44c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -2148,6 +2148,28 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         assertTrue((boolean) isClosingOrDeletingField.get(topic));
     }
 
+    @Test
+    public void testTopicCloseFencingTimeout() throws Exception {
+        pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10);
+        Method fence = PersistentTopic.class.getDeclaredMethod("fence");
+        fence.setAccessible(true);
+        Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
+        fencedTopicMonitoringTaskField.setAccessible(true);
+
+        // create topic
+        PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+
+        // fence topic to init fencedTopicMonitoringTask
+        fence.invoke(topic);
+
+        // close topic
+        topic.close().get();
+        assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
+        ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic);
+        assertTrue(fencedTopicMonitoringTask.isDone());
+        assertTrue(fencedTopicMonitoringTask.isCancelled());
+    }
+
     @Test
     public void testGetDurableSubscription() throws Exception {
         ManagedLedger mockLedger = mock(ManagedLedger.class);