You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/08/21 05:44:45 UTC
[pulsar] branch master updated: [fix][broker] Fix compaction subscription delete by inactive subscription check. (#20983)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 43cd86d6835 [fix][broker] Fix compaction subscription delete by inactive subscription check. (#20983)
43cd86d6835 is described below
commit 43cd86d6835b4b096739e510f9fa928a64e46ac9
Author: lifepuzzlefun <wj...@163.com>
AuthorDate: Mon Aug 21 13:44:39 2023 +0800
[fix][broker] Fix compaction subscription delete by inactive subscription check. (#20983)
---
.../broker/service/persistent/PersistentTopic.java | 4 +-
.../pulsar/broker/service/BrokerServiceTest.java | 66 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5169fe3b0eb..cf15a2601de 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2737,7 +2737,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
.toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime);
if (expirationTimeMillis > 0) {
subscriptions.forEach((subName, sub) -> {
- if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() || sub.isReplicated()) {
+ if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()
+ || sub.isReplicated()
+ || isCompactionSubscription(subName)) {
return;
}
if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index fc25c919fe3..c61da7fc03b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -67,6 +67,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.http.HttpResponse;
@@ -77,6 +78,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
@@ -109,6 +111,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -1218,6 +1221,69 @@ public class BrokerServiceTest extends BrokerTestBase {
}
}
+ @Test
+ public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() throws Exception {
+ String namespace = "prop/test";
+
+ // set up broker set compaction threshold.
+ cleanup();
+ conf.setBrokerServiceCompactionThresholdInBytes(8);
+ setup();
+
+ try {
+ admin.namespaces().createNamespace(namespace);
+ } catch (PulsarAdminException.ConflictException e) {
+ // Ok.. (if test fails intermittently and namespace is already created)
+ }
+
+ // set enable subscription expiration.
+ admin.namespaces().setSubscriptionExpirationTime(namespace, 1);
+
+ String compactionInactiveTestTopic = "persistent://prop/test/testCompactionCursorShouldNotDelete";
+
+ admin.topics().createNonPartitionedTopic(compactionInactiveTestTopic);
+
+ CompletableFuture<Optional<Topic>> topicCf =
+ pulsar.getBrokerService().getTopic(compactionInactiveTestTopic, true);
+
+ Optional<Topic> topicOptional = topicCf.get();
+ assertTrue(topicOptional.isPresent());
+
+ PersistentTopic topic = (PersistentTopic) topicOptional.get();
+
+ PersistentSubscription sub = (PersistentSubscription) topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION);
+ assertNotNull(sub);
+
+ topic.checkCompaction();
+
+ Field currentCompaction = PersistentTopic.class.getDeclaredField("currentCompaction");
+ currentCompaction.setAccessible(true);
+ CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)currentCompaction.get(topic);
+
+ compactionFuture.get();
+
+ ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor();
+
+ // make cursor last active time to very small to check if it will be deleted
+ Field cursorLastActiveField = ManagedCursorImpl.class.getDeclaredField("lastActive");
+ cursorLastActiveField.setAccessible(true);
+ cursorLastActiveField.set(cursor, 0);
+
+ // replace origin object. so we can check if subscription is deleted.
+ PersistentSubscription spySubscription = Mockito.spy(sub);
+ topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, spySubscription);
+
+ // trigger inactive check.
+ topic.checkInactiveSubscriptions();
+
+ // Compaction subscription should not call delete method.
+ Mockito.verify(spySubscription, Mockito.never()).delete();
+
+ // check if the subscription exist.
+ assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
+
+ }
+
/**
* Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and
* it should not introduce deadlock while performing it.