You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/08/21 05:44:45 UTC

[pulsar] branch master updated: [fix][broker] Fix compaction subscription delete by inactive subscription check. (#20983)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 43cd86d6835 [fix][broker] Fix compaction subscription delete by inactive subscription check. (#20983)
43cd86d6835 is described below

commit 43cd86d6835b4b096739e510f9fa928a64e46ac9
Author: lifepuzzlefun <wj...@163.com>
AuthorDate: Mon Aug 21 13:44:39 2023 +0800

    [fix][broker] Fix compaction subscription delete by inactive subscription check. (#20983)
---
 .../broker/service/persistent/PersistentTopic.java |  4 +-
 .../pulsar/broker/service/BrokerServiceTest.java   | 66 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5169fe3b0eb..cf15a2601de 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2737,7 +2737,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime);
             if (expirationTimeMillis > 0) {
                 subscriptions.forEach((subName, sub) -> {
-                    if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() || sub.isReplicated()) {
+                    if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()
+                            || sub.isReplicated()
+                            || isCompactionSubscription(subName)) {
                         return;
                     }
                     if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index fc25c919fe3..c61da7fc03b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -67,6 +67,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.http.HttpResponse;
@@ -77,6 +78,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.client.admin.BrokerStats;
@@ -109,6 +111,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.compaction.Compactor;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -1218,6 +1221,69 @@ public class BrokerServiceTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() throws Exception {
+        String namespace = "prop/test";
+
+        // set up broker set compaction threshold.
+        cleanup();
+        conf.setBrokerServiceCompactionThresholdInBytes(8);
+        setup();
+
+        try {
+            admin.namespaces().createNamespace(namespace);
+        } catch (PulsarAdminException.ConflictException e) {
+            // Ok.. (if test fails intermittently and namespace is already created)
+        }
+
+        // set enable subscription expiration.
+        admin.namespaces().setSubscriptionExpirationTime(namespace, 1);
+
+        String compactionInactiveTestTopic = "persistent://prop/test/testCompactionCursorShouldNotDelete";
+
+        admin.topics().createNonPartitionedTopic(compactionInactiveTestTopic);
+
+        CompletableFuture<Optional<Topic>> topicCf =
+                pulsar.getBrokerService().getTopic(compactionInactiveTestTopic, true);
+
+        Optional<Topic> topicOptional = topicCf.get();
+        assertTrue(topicOptional.isPresent());
+
+        PersistentTopic topic = (PersistentTopic) topicOptional.get();
+
+        PersistentSubscription sub = (PersistentSubscription) topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION);
+        assertNotNull(sub);
+
+        topic.checkCompaction();
+
+        Field currentCompaction = PersistentTopic.class.getDeclaredField("currentCompaction");
+        currentCompaction.setAccessible(true);
+        CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)currentCompaction.get(topic);
+
+        compactionFuture.get();
+
+        ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor();
+
+        // make cursor last active time to very small to check if it will be deleted
+        Field cursorLastActiveField = ManagedCursorImpl.class.getDeclaredField("lastActive");
+        cursorLastActiveField.setAccessible(true);
+        cursorLastActiveField.set(cursor, 0);
+
+        // replace origin object. so we can check if subscription is deleted.
+        PersistentSubscription spySubscription = Mockito.spy(sub);
+        topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, spySubscription);
+
+        // trigger inactive check.
+        topic.checkInactiveSubscriptions();
+
+        // Compaction subscription should not call delete method.
+        Mockito.verify(spySubscription, Mockito.never()).delete();
+
+        // check if the subscription exist.
+        assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
+
+    }
+
     /**
      * Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and
      * it should not introduce deadlock while performing it.