You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2024/01/11 01:10:11 UTC

(pulsar) 01/02: [fix][broker] Delete compacted ledger when topic is deleted (#21745)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c5803106b7e555d15b310b4a5f4fb9b47eabc127
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Dec 29 14:39:45 2023 +0800

    [fix][broker] Delete compacted ledger when topic is deleted (#21745)
---
 .../broker/service/persistent/PersistentTopic.java |  61 ++++++++--
 .../persistent/PulsarCompactorSubscription.java    |  17 +++
 .../apache/pulsar/client/impl/RawReaderImpl.java   |   3 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |   5 +
 .../apache/pulsar/compaction/CompactionTest.java   | 130 +++++++++++++++++++++
 5 files changed, 203 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 48069cf5554..da36ad092ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -230,7 +230,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     protected final MessageDeduplication messageDeduplication;
 
     private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
-    private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
+    private volatile CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(
+            COMPACTION_NEVER_RUN);
     private TopicCompactionService topicCompactionService;
 
     // TODO: Create compaction strategy from topic policy when exposing strategic compaction to users.
@@ -1196,13 +1197,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                                                           CompletableFuture<Void> unsubscribeFuture) {
         PersistentSubscription persistentSubscription = subscriptions.get(subscriptionName);
         if (persistentSubscription == null) {
-            log.warn("[{}][{}] Can't find subscription, skip clear delayed message", topic, subscriptionName);
+            log.warn("[{}][{}] Can't find subscription, skip delete cursor", topic, subscriptionName);
             unsubscribeFuture.complete(null);
             return;
         }
+
         if (!isDelayedDeliveryEnabled()
                 || !(brokerService.getDelayedDeliveryTrackerFactory() instanceof BucketDelayedDeliveryTrackerFactory)) {
-            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+            asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, unsubscribeFuture);
             return;
         }
 
@@ -1217,7 +1219,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     if (ex != null) {
                         unsubscribeFuture.completeExceptionally(ex);
                     } else {
-                        asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                        asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, unsubscribeFuture);
                     }
                 });
             }
@@ -1227,6 +1229,29 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         dispatcher.clearDelayedMessages().whenComplete((__, ex) -> {
             if (ex != null) {
                 unsubscribeFuture.completeExceptionally(ex);
+            } else {
+                asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, unsubscribeFuture);
+            }
+        });
+    }
+
+    private void asyncDeleteCursorWithCleanCompactionLedger(PersistentSubscription subscription,
+                                                            CompletableFuture<Void> unsubscribeFuture) {
+        final String subscriptionName = subscription.getName();
+        if ((!isCompactionSubscription(subscriptionName)) || !(subscription instanceof PulsarCompactorSubscription)) {
+            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+            return;
+        }
+
+        currentCompaction.handle((__, e) -> {
+            if (e != null) {
+                log.warn("[{}][{}] Last compaction task failed", topic, subscriptionName);
+            }
+            return ((PulsarCompactorSubscription) subscription).cleanCompactedLedger();
+        }).whenComplete((__, ex) -> {
+            if (ex != null) {
+                log.error("[{}][{}] Error cleaning compacted ledger", topic, subscriptionName, ex);
+                unsubscribeFuture.completeExceptionally(ex);
             } else {
                 asyncDeleteCursor(subscriptionName, unsubscribeFuture);
             }
@@ -3427,17 +3452,29 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public synchronized void triggerCompaction()
             throws PulsarServerException, AlreadyRunningException {
         if (currentCompaction.isDone()) {
+            if (!lock.readLock().tryLock()) {
+                log.info("[{}] Conflict topic-close, topic-delete, skip triggering compaction", topic);
+                return;
+            }
+            try {
+                if (isClosingOrDeleting) {
+                    log.info("[{}] Topic is closing or deleting, skip triggering compaction", topic);
+                    return;
+                }
 
-            if (strategicCompactionMap.containsKey(topic)) {
-                currentCompaction = brokerService.pulsar().getStrategicCompactor()
-                        .compact(topic, strategicCompactionMap.get(topic));
-            } else {
-                currentCompaction = topicCompactionService.compact().thenApply(x -> null);
+                if (strategicCompactionMap.containsKey(topic)) {
+                    currentCompaction = brokerService.pulsar().getStrategicCompactor()
+                            .compact(topic, strategicCompactionMap.get(topic));
+                } else {
+                    currentCompaction = topicCompactionService.compact().thenApply(x -> null);
+                }
+            } finally {
+                lock.readLock().unlock();
             }
             currentCompaction.whenComplete((ignore, ex) -> {
-               if (ex != null){
-                   log.warn("[{}] Compaction failure.", topic, ex);
-               }
+                if (ex != null) {
+                    log.warn("[{}] Compaction failure.", topic, ex);
+                }
             });
         } else {
             throw new AlreadyRunningException("Compaction already in progress");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
index dbb09f6ac39..fe13aeb572e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
@@ -22,12 +22,15 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.broker.service.AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.compaction.Compactor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,5 +109,19 @@ public class PulsarCompactorSubscription extends PersistentSubscription {
         }
     }
 
+    CompletableFuture<Void> cleanCompactedLedger() {
+        final CompletableFuture<CompactedTopicContext> compactedTopicContextFuture =
+                ((CompactedTopicImpl) compactedTopic).getCompactedTopicContextFuture();
+        if (compactedTopicContextFuture != null) {
+            return compactedTopicContextFuture.thenCompose(context -> {
+                long compactedLedgerId = context.getLedger().getId();
+                ((CompactedTopicImpl) compactedTopic).reset();
+                return compactedTopic.deleteCompactedLedger(compactedLedgerId);
+            });
+        } else {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PulsarCompactorSubscription.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 70bda888bf7..f6523241399 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -59,6 +59,7 @@ public class RawReaderImpl implements RawReader {
         consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
         consumerConfiguration.setReadCompacted(true);
         consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+        consumerConfiguration.setAckReceiptEnabled(true);
 
         consumer = new RawConsumerImpl(client, consumerConfiguration,
                                        consumerFuture);
@@ -122,7 +123,7 @@ public class RawReaderImpl implements RawReader {
                     MessageId.earliest,
                     0 /* startMessageRollbackDurationInSec */,
                     Schema.BYTES, null,
-                    true
+                    false
             );
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 8794e2736d4..a8e124c84a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -330,6 +330,11 @@ public class CompactedTopicImpl implements CompactedTopic {
         return Optional.ofNullable(this.compactionHorizon);
     }
 
+    public void reset() {
+        this.compactionHorizon = null;
+        this.compactedTopicContext = null;
+    }
+
     @Nullable
     public CompletableFuture<CompactedTopicContext> getCompactedTopicContextFuture() {
         return compactedTopicContext;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 9eda5479683..a1dfbe60558 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -46,21 +47,26 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -81,6 +87,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -2012,4 +2019,127 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
             }
         }
     }
+
+    @Test
+    public void testDeleteCompactedLedger() throws Exception {
+        String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedger";
+
+        final String subName = "my-sub";
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
+        }
+        producer.flush();
+
+        compact(topicName);
+
+        MutableLong compactedLedgerId = new MutableLong(-1);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+            compactedLedgerId.setValue(stats.compactedLedger.ledgerId);
+            Assert.assertEquals(stats.compactedLedger.entries, 2L);
+        });
+
+        // delete compacted ledger
+        admin.topics().deleteSubscription(topicName, "__compaction");
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+            Assert.assertEquals(stats.compactedLedger.ledgerId, -1L);
+            Assert.assertEquals(stats.compactedLedger.entries, -1L);
+            assertThrows(BKException.BKNoSuchLedgerExistsException.class, () -> pulsarTestContext.getBookKeeperClient()
+                        .openLedger(compactedLedgerId.getValue(), BookKeeper.DigestType.CRC32C, new byte[]{}));
+        });
+
+        compact(topicName);
+
+        MutableLong compactedLedgerId2 = new MutableLong(-1);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+            compactedLedgerId2.setValue(stats.compactedLedger.ledgerId);
+            Assert.assertEquals(stats.compactedLedger.entries, 2L);
+        });
+
+        producer.close();
+        admin.topics().delete(topicName);
+
+        Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class,
+                () -> pulsarTestContext.getBookKeeperClient().openLedger(
+                        compactedLedgerId2.getValue(), BookKeeper.DigestType.CRC32, new byte[]{})));
+    }
+
+    @Test
+    public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
+        // Disable topic level policies, since block ack thread may also block thread of delete topic policies.
+        conf.setTopicLevelPoliciesEnabled(false);
+        restartBroker();
+
+        String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck";
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(Compactor.COMPACTION_SUBSCRIPTION)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe()
+                .close();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
+        }
+        producer.flush();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentSubscription subscription = spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
+        topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, subscription);
+
+        AtomicLong compactedLedgerId = new AtomicLong(-1);
+        AtomicBoolean pauseAck = new AtomicBoolean();
+        Mockito.doAnswer(invocationOnMock -> {
+            Map<String, Long> properties = (Map<String, Long>) invocationOnMock.getArguments()[2];
+            log.info("acknowledgeMessage properties: {}", properties);
+            compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
+            pauseAck.set(true);
+            while (pauseAck.get()) {
+                Thread.sleep(200);
+            }
+            return invocationOnMock.callRealMethod();
+        }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq(
+                CommandAck.AckType.Cumulative), Mockito.any());
+
+        admin.topics().triggerCompaction(topicName);
+
+        while (!pauseAck.get()) {
+            Thread.sleep(100);
+        }
+
+        CompletableFuture<Long> currentCompaction =
+                (CompletableFuture<Long>) FieldUtils.readDeclaredField(topic, "currentCompaction", true);
+        CompletableFuture<Long> spyCurrentCompaction = spy(currentCompaction);
+        FieldUtils.writeDeclaredField(topic, "currentCompaction", spyCurrentCompaction, true);
+        currentCompaction.whenComplete((obj, throwable) -> {
+            if (throwable != null) {
+                spyCurrentCompaction.completeExceptionally(throwable);
+            } else {
+                spyCurrentCompaction.complete(obj);
+            }
+        });
+        Mockito.doAnswer(invocationOnMock -> {
+            pauseAck.set(false);
+            return invocationOnMock.callRealMethod();
+        }).when(spyCurrentCompaction).handle(Mockito.any());
+
+        admin.topics().delete(topicName, true);
+
+        Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class,
+                () -> pulsarTestContext.getBookKeeperClient().openLedger(
+                        compactedLedgerId.get(), BookKeeper.DigestType.CRC32, new byte[]{})));
+    }
 }