You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/04/21 03:54:44 UTC

[pulsar] branch master updated: Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e4a8de1eb96 Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)
e4a8de1eb96 is described below

commit e4a8de1eb9605e36060f0740a0097203a21e34ef
Author: Shen Liu <li...@126.com>
AuthorDate: Thu Apr 21 11:54:34 2022 +0800

    Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)
    
    Co-authored-by: druidliu <dr...@tencent.com>
    
    Fixes #15196.
    
    ### Motivation
    
    If broker having conf `topicFencingTimeoutSeconds`>0, a topic is trigged closed and closed normally, `closeFencedTopicForcefully` should not be called.
    
    ### Modifications
    
    Cancel fenced topic monitoring task if topic close normally, which cancel running `closeFencedTopicForcefully`.
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change added tests and can be verified as follows:
      - Add `org.apache.pulsar.broker.service.PersistentTopicTest#testTopicCloseFencingTimeout`
    
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (yes / no)
      - The public API: (yes / no)
      - The schema: (yes / no / don't know)
      - The default values of configurations: (yes / no)
      - The wire protocol: (yes / no)
      - The rest endpoints: (yes / no)
      - The admin cli options: (yes / no)
      - Anything that affects deployment: (yes / no / don't know)
    
    ### Documentation
    
    Check the box below or label this PR directly.
    
    Need to update docs?
    
    - [ ] `doc-required`
    - [x] `no-need-doc`
    - [ ] `doc`
    - [ ] `doc-added`
---
 .../broker/service/persistent/PersistentTopic.java | 13 +++++++++----
 .../pulsar/broker/service/PersistentTopicTest.java | 22 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ffeed8bd2ea..85ee429c026 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1309,6 +1309,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
                                 unregisterTopicPolicyListener();
                                 log.info("[{}] Topic closed", topic);
+                                cancelFencedTopicMonitoringTask();
                                 closeFuture.complete(null);
                             })
                     .exceptionally(ex -> {
@@ -2899,6 +2900,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return true;
     }
 
+    private synchronized void cancelFencedTopicMonitoringTask() {
+        ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
+        if (monitoringTask != null && !monitoringTask.isDone()) {
+            monitoringTask.cancel(false);
+        }
+    }
+
     private synchronized void fence() {
         isFenced = true;
         ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
@@ -2913,10 +2921,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
     private synchronized void unfence() {
         isFenced = false;
-        ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
-        if (monitoringTask != null && !monitoringTask.isDone()) {
-            monitoringTask.cancel(false);
-        }
+        cancelFencedTopicMonitoringTask();
     }
 
     private void closeFencedTopicForcefully() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 19ee7f58c4b..d1aba97ee90 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -2162,6 +2162,28 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         assertTrue((boolean) isClosingOrDeletingField.get(topic));
     }
 
+    @Test
+    public void testTopicCloseFencingTimeout() throws Exception {
+        pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10);
+        Method fence = PersistentTopic.class.getDeclaredMethod("fence");
+        fence.setAccessible(true);
+        Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
+        fencedTopicMonitoringTaskField.setAccessible(true);
+
+        // create topic
+        PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+
+        // fence topic to init fencedTopicMonitoringTask
+        fence.invoke(topic);
+
+        // close topic
+        topic.close().get();
+        assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
+        ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic);
+        assertTrue(fencedTopicMonitoringTask.isDone());
+        assertTrue(fencedTopicMonitoringTask.isCancelled());
+    }
+
     @Test
     public void testGetDurableSubscription() throws Exception {
         ManagedLedger mockLedger = mock(ManagedLedger.class);