You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2024/01/11 01:10:11 UTC
(pulsar) 01/02: [fix][broker] Delete compacted ledger when topic is deleted (#21745)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c5803106b7e555d15b310b4a5f4fb9b47eabc127
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Dec 29 14:39:45 2023 +0800
[fix][broker] Delete compacted ledger when topic is deleted (#21745)
---
.../broker/service/persistent/PersistentTopic.java | 61 ++++++++--
.../persistent/PulsarCompactorSubscription.java | 17 +++
.../apache/pulsar/client/impl/RawReaderImpl.java | 3 +-
.../pulsar/compaction/CompactedTopicImpl.java | 5 +
.../apache/pulsar/compaction/CompactionTest.java | 130 +++++++++++++++++++++
5 files changed, 203 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 48069cf5554..da36ad092ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -230,7 +230,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
protected final MessageDeduplication messageDeduplication;
private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
- private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
+ private volatile CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(
+ COMPACTION_NEVER_RUN);
private TopicCompactionService topicCompactionService;
// TODO: Create compaction strategy from topic policy when exposing strategic compaction to users.
@@ -1196,13 +1197,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
CompletableFuture<Void> unsubscribeFuture) {
PersistentSubscription persistentSubscription = subscriptions.get(subscriptionName);
if (persistentSubscription == null) {
- log.warn("[{}][{}] Can't find subscription, skip clear delayed message", topic, subscriptionName);
+ log.warn("[{}][{}] Can't find subscription, skip delete cursor", topic, subscriptionName);
unsubscribeFuture.complete(null);
return;
}
+
if (!isDelayedDeliveryEnabled()
|| !(brokerService.getDelayedDeliveryTrackerFactory() instanceof BucketDelayedDeliveryTrackerFactory)) {
- asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, unsubscribeFuture);
return;
}
@@ -1217,7 +1219,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
if (ex != null) {
unsubscribeFuture.completeExceptionally(ex);
} else {
- asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, unsubscribeFuture);
}
});
}
@@ -1227,6 +1229,29 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
dispatcher.clearDelayedMessages().whenComplete((__, ex) -> {
if (ex != null) {
unsubscribeFuture.completeExceptionally(ex);
+ } else {
+ asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, unsubscribeFuture);
+ }
+ });
+ }
+
+ private void asyncDeleteCursorWithCleanCompactionLedger(PersistentSubscription subscription,
+ CompletableFuture<Void> unsubscribeFuture) {
+ final String subscriptionName = subscription.getName();
+ if ((!isCompactionSubscription(subscriptionName)) || !(subscription instanceof PulsarCompactorSubscription)) {
+ asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ return;
+ }
+
+ currentCompaction.handle((__, e) -> {
+ if (e != null) {
+ log.warn("[{}][{}] Last compaction task failed", topic, subscriptionName);
+ }
+ return ((PulsarCompactorSubscription) subscription).cleanCompactedLedger();
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("[{}][{}] Error cleaning compacted ledger", topic, subscriptionName, ex);
+ unsubscribeFuture.completeExceptionally(ex);
} else {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
}
@@ -3427,17 +3452,29 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
public synchronized void triggerCompaction()
throws PulsarServerException, AlreadyRunningException {
if (currentCompaction.isDone()) {
+ if (!lock.readLock().tryLock()) {
+ log.info("[{}] Conflict topic-close, topic-delete, skip triggering compaction", topic);
+ return;
+ }
+ try {
+ if (isClosingOrDeleting) {
+ log.info("[{}] Topic is closing or deleting, skip triggering compaction", topic);
+ return;
+ }
- if (strategicCompactionMap.containsKey(topic)) {
- currentCompaction = brokerService.pulsar().getStrategicCompactor()
- .compact(topic, strategicCompactionMap.get(topic));
- } else {
- currentCompaction = topicCompactionService.compact().thenApply(x -> null);
+ if (strategicCompactionMap.containsKey(topic)) {
+ currentCompaction = brokerService.pulsar().getStrategicCompactor()
+ .compact(topic, strategicCompactionMap.get(topic));
+ } else {
+ currentCompaction = topicCompactionService.compact().thenApply(x -> null);
+ }
+ } finally {
+ lock.readLock().unlock();
}
currentCompaction.whenComplete((ignore, ex) -> {
- if (ex != null){
- log.warn("[{}] Compaction failure.", topic, ex);
- }
+ if (ex != null) {
+ log.warn("[{}] Compaction failure.", topic, ex);
+ }
});
} else {
throw new AlreadyRunningException("Compaction already in progress");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
index dbb09f6ac39..fe13aeb572e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
@@ -22,12 +22,15 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.service.AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,5 +109,19 @@ public class PulsarCompactorSubscription extends PersistentSubscription {
}
}
+ CompletableFuture<Void> cleanCompactedLedger() {
+ final CompletableFuture<CompactedTopicContext> compactedTopicContextFuture =
+ ((CompactedTopicImpl) compactedTopic).getCompactedTopicContextFuture();
+ if (compactedTopicContextFuture != null) {
+ return compactedTopicContextFuture.thenCompose(context -> {
+ long compactedLedgerId = context.getLedger().getId();
+ ((CompactedTopicImpl) compactedTopic).reset();
+ return compactedTopic.deleteCompactedLedger(compactedLedgerId);
+ });
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(PulsarCompactorSubscription.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 70bda888bf7..f6523241399 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -59,6 +59,7 @@ public class RawReaderImpl implements RawReader {
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
consumerConfiguration.setReadCompacted(true);
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+ consumerConfiguration.setAckReceiptEnabled(true);
consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture);
@@ -122,7 +123,7 @@ public class RawReaderImpl implements RawReader {
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
Schema.BYTES, null,
- true
+ false
);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 8794e2736d4..a8e124c84a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -330,6 +330,11 @@ public class CompactedTopicImpl implements CompactedTopic {
return Optional.ofNullable(this.compactionHorizon);
}
+ public void reset() {
+ this.compactionHorizon = null;
+ this.compactedTopicContext = null;
+ }
+
@Nullable
public CompletableFuture<CompactedTopicContext> getCompactedTopicContextFuture() {
return compactedTopicContext;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 9eda5479683..a1dfbe60558 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -46,21 +47,26 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -81,6 +87,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -2012,4 +2019,127 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
}
}
+
+ @Test
+ public void testDeleteCompactedLedger() throws Exception {
+ String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedger";
+
+ final String subName = "my-sub";
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+ pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();
+
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
+ }
+ producer.flush();
+
+ compact(topicName);
+
+ MutableLong compactedLedgerId = new MutableLong(-1);
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+ Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+ compactedLedgerId.setValue(stats.compactedLedger.ledgerId);
+ Assert.assertEquals(stats.compactedLedger.entries, 2L);
+ });
+
+ // delete compacted ledger
+ admin.topics().deleteSubscription(topicName, "__compaction");
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+ Assert.assertEquals(stats.compactedLedger.ledgerId, -1L);
+ Assert.assertEquals(stats.compactedLedger.entries, -1L);
+ assertThrows(BKException.BKNoSuchLedgerExistsException.class, () -> pulsarTestContext.getBookKeeperClient()
+ .openLedger(compactedLedgerId.getValue(), BookKeeper.DigestType.CRC32C, new byte[]{}));
+ });
+
+ compact(topicName);
+
+ MutableLong compactedLedgerId2 = new MutableLong(-1);
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+ Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+ compactedLedgerId2.setValue(stats.compactedLedger.ledgerId);
+ Assert.assertEquals(stats.compactedLedger.entries, 2L);
+ });
+
+ producer.close();
+ admin.topics().delete(topicName);
+
+ Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class,
+ () -> pulsarTestContext.getBookKeeperClient().openLedger(
+ compactedLedgerId2.getValue(), BookKeeper.DigestType.CRC32, new byte[]{})));
+ }
+
+ @Test
+ public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
+ // Disable topic level policies, since block ack thread may also block thread of delete topic policies.
+ conf.setTopicLevelPoliciesEnabled(false);
+ restartBroker();
+
+ String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck";
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+ pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName(Compactor.COMPACTION_SUBSCRIPTION)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe()
+ .close();
+
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
+ }
+ producer.flush();
+
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentSubscription subscription = spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
+ topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, subscription);
+
+ AtomicLong compactedLedgerId = new AtomicLong(-1);
+ AtomicBoolean pauseAck = new AtomicBoolean();
+ Mockito.doAnswer(invocationOnMock -> {
+ Map<String, Long> properties = (Map<String, Long>) invocationOnMock.getArguments()[2];
+ log.info("acknowledgeMessage properties: {}", properties);
+ compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
+ pauseAck.set(true);
+ while (pauseAck.get()) {
+ Thread.sleep(200);
+ }
+ return invocationOnMock.callRealMethod();
+ }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq(
+ CommandAck.AckType.Cumulative), Mockito.any());
+
+ admin.topics().triggerCompaction(topicName);
+
+ while (!pauseAck.get()) {
+ Thread.sleep(100);
+ }
+
+ CompletableFuture<Long> currentCompaction =
+ (CompletableFuture<Long>) FieldUtils.readDeclaredField(topic, "currentCompaction", true);
+ CompletableFuture<Long> spyCurrentCompaction = spy(currentCompaction);
+ FieldUtils.writeDeclaredField(topic, "currentCompaction", spyCurrentCompaction, true);
+ currentCompaction.whenComplete((obj, throwable) -> {
+ if (throwable != null) {
+ spyCurrentCompaction.completeExceptionally(throwable);
+ } else {
+ spyCurrentCompaction.complete(obj);
+ }
+ });
+ Mockito.doAnswer(invocationOnMock -> {
+ pauseAck.set(false);
+ return invocationOnMock.callRealMethod();
+ }).when(spyCurrentCompaction).handle(Mockito.any());
+
+ admin.topics().delete(topicName, true);
+
+ Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class,
+ () -> pulsarTestContext.getBookKeeperClient().openLedger(
+ compactedLedgerId.get(), BookKeeper.DigestType.CRC32, new byte[]{})));
+ }
}