You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/04/21 03:54:44 UTC
[pulsar] branch master updated: Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e4a8de1eb96 Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)
e4a8de1eb96 is described below
commit e4a8de1eb9605e36060f0740a0097203a21e34ef
Author: Shen Liu <li...@126.com>
AuthorDate: Thu Apr 21 11:54:34 2022 +0800
Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202)
Co-authored-by: druidliu <dr...@tencent.com>
Fixes #15196.
### Motivation
If broker having conf `topicFencingTimeoutSeconds`>0, a topic is trigged closed and closed normally, `closeFencedTopicForcefully` should not be called.
### Modifications
Cancel fenced topic monitoring task if topic close normally, which cancel running `closeFencedTopicForcefully`.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- Add `org.apache.pulsar.broker.service.PersistentTopicTest#testTopicCloseFencingTimeout`
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API: (yes / no)
- The schema: (yes / no / don't know)
- The default values of configurations: (yes / no)
- The wire protocol: (yes / no)
- The rest endpoints: (yes / no)
- The admin cli options: (yes / no)
- Anything that affects deployment: (yes / no / don't know)
### Documentation
Check the box below or label this PR directly.
Need to update docs?
- [ ] `doc-required`
- [x] `no-need-doc`
- [ ] `doc`
- [ ] `doc-added`
---
.../broker/service/persistent/PersistentTopic.java | 13 +++++++++----
.../pulsar/broker/service/PersistentTopicTest.java | 22 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ffeed8bd2ea..85ee429c026 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1309,6 +1309,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
unregisterTopicPolicyListener();
log.info("[{}] Topic closed", topic);
+ cancelFencedTopicMonitoringTask();
closeFuture.complete(null);
})
.exceptionally(ex -> {
@@ -2899,6 +2900,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return true;
}
+ private synchronized void cancelFencedTopicMonitoringTask() {
+ ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
+ if (monitoringTask != null && !monitoringTask.isDone()) {
+ monitoringTask.cancel(false);
+ }
+ }
+
private synchronized void fence() {
isFenced = true;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
@@ -2913,10 +2921,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private synchronized void unfence() {
isFenced = false;
- ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
- if (monitoringTask != null && !monitoringTask.isDone()) {
- monitoringTask.cancel(false);
- }
+ cancelFencedTopicMonitoringTask();
}
private void closeFencedTopicForcefully() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 19ee7f58c4b..d1aba97ee90 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -2162,6 +2162,28 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
assertTrue((boolean) isClosingOrDeletingField.get(topic));
}
+ @Test
+ public void testTopicCloseFencingTimeout() throws Exception {
+ pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10);
+ Method fence = PersistentTopic.class.getDeclaredMethod("fence");
+ fence.setAccessible(true);
+ Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
+ fencedTopicMonitoringTaskField.setAccessible(true);
+
+ // create topic
+ PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
+
+ // fence topic to init fencedTopicMonitoringTask
+ fence.invoke(topic);
+
+ // close topic
+ topic.close().get();
+ assertFalse(brokerService.getTopicReference(successTopicName).isPresent());
+ ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic);
+ assertTrue(fencedTopicMonitoringTask.isDone());
+ assertTrue(fencedTopicMonitoringTask.isCancelled());
+ }
+
@Test
public void testGetDurableSubscription() throws Exception {
ManagedLedger mockLedger = mock(ManagedLedger.class);