You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2023/04/15 05:42:19 UTC

[pulsar] branch master updated: [fix][broker] Fix avoid future of clear delayed message can't complete (#20075)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6152cc82ac8 [fix][broker] Fix avoid future of clear delayed message can't complete (#20075)
6152cc82ac8 is described below

commit 6152cc82ac8a344cfc2539c56f15cec6f4a1cff7
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Sat Apr 15 13:42:07 2023 +0800

    [fix][broker] Fix avoid future of clear delayed message can't complete (#20075)
---
 .../BucketDelayedDeliveryTrackerFactory.java       | 28 +++++++++++++
 .../pulsar/broker/delayed/bucket/Bucket.java       |  3 +-
 .../bucket/BucketDelayedDeliveryTracker.java       |  4 +-
 .../PersistentDispatcherMultipleConsumers.java     | 17 ++++----
 .../broker/service/persistent/PersistentTopic.java | 26 +++++++-----
 .../persistent/BucketDelayedDeliveryTest.java      | 48 ++++++++++++++++++++++
 6 files changed, 102 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index f0feb8b27d6..6a00bfd1995 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -21,13 +21,19 @@ package org.apache.pulsar.broker.delayed;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
 import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.common.util.FutureUtil;
 
 public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {
 
@@ -72,6 +78,28 @@ public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrack
                 delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
     }
 
+    /**
+     * Clean up residual snapshot data.
+     * If tracker has not been created or has been closed, then we can't clean up the snapshot with `tracker.clear`,
+     * this method can clean up the residual snapshots without creating a tracker.
+     */
+    public CompletableFuture<Void> cleanResidualSnapshots(ManagedCursor cursor) {
+        Map<String, String> cursorProperties = cursor.getCursorProperties();
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
+        cursorProperties.forEach((k, v) -> {
+            if (k != null && v != null && k.startsWith(BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX)) {
+                CompletableFuture<Void> future = sequencer.sequential(() -> {
+                    return cursor.removeCursorProperty(k)
+                            .thenCompose(__ -> bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v)));
+                });
+                futures.add(future);
+            }
+        });
+
+        return FutureUtil.waitForAll(futures);
+    }
+
     @Override
     public void close() throws Exception {
         if (bucketSnapshotStorage != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index 5b7023be503..4d7d3aa512b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.broker.delayed.bucket;
 
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
 import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
+import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -41,7 +41,6 @@ import org.roaringbitmap.RoaringBitmap;
 @AllArgsConstructor
 abstract class Bucket {
 
-    static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket";
     static final String DELIMITER = "_";
     static final int MaxRetryTimes = 3;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index a34bd51af98..6ead1e207b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.delayed.bucket;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
 import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
 import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes;
 import com.google.common.annotations.VisibleForTesting;
@@ -66,6 +66,8 @@ import org.roaringbitmap.RoaringBitmap;
 @ThreadSafe
 public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {
 
+    public static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket";
+
     static final CompletableFuture<Long> NULL_LONG_PROMISE = CompletableFuture.completedFuture(null);
 
     static final int AsyncOperationTimeoutSeconds = 60;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4ac755860fc..c60b4562bf1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -49,6 +49,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
@@ -1098,19 +1099,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             return CompletableFuture.completedFuture(null);
         }
 
-        if (delayedDeliveryTracker.isEmpty() && topic.getBrokerService()
-                .getDelayedDeliveryTrackerFactory() instanceof BucketDelayedDeliveryTrackerFactory) {
-            synchronized (this) {
-                if (delayedDeliveryTracker.isEmpty()) {
-                    delayedDeliveryTracker = Optional
-                            .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
-                }
-            }
-        }
-
         if (delayedDeliveryTracker.isPresent()) {
             return this.delayedDeliveryTracker.get().clear();
         } else {
+            DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory =
+                    topic.getBrokerService().getDelayedDeliveryTrackerFactory();
+            if (delayedDeliveryTrackerFactory instanceof BucketDelayedDeliveryTrackerFactory
+                    bucketDelayedDeliveryTrackerFactory) {
+                return bucketDelayedDeliveryTrackerFactory.cleanResidualSnapshots(cursor);
+            }
             return CompletableFuture.completedFuture(null);
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0f5e6043981..fe181bb1c01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -81,6 +81,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
+import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
 import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -1169,15 +1170,21 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
 
         Dispatcher dispatcher = persistentSubscription.getDispatcher();
-        final Dispatcher temporaryDispatcher;
         if (dispatcher == null) {
-            log.info("[{}][{}] Dispatcher is null, try to create temporary dispatcher to clear delayed message", topic,
-                    subscriptionName);
-            dispatcher = temporaryDispatcher =
-                    new PersistentDispatcherMultipleConsumers(this, persistentSubscription.cursor,
-                            persistentSubscription);
-        } else {
-            temporaryDispatcher = null;
+            DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory =
+                    brokerService.getDelayedDeliveryTrackerFactory();
+            if (delayedDeliveryTrackerFactory instanceof BucketDelayedDeliveryTrackerFactory
+                    bucketDelayedDeliveryTrackerFactory) {
+                ManagedCursor cursor = persistentSubscription.getCursor();
+                bucketDelayedDeliveryTrackerFactory.cleanResidualSnapshots(cursor).whenComplete((__, ex) -> {
+                    if (ex != null) {
+                        unsubscribeFuture.completeExceptionally(ex);
+                    } else {
+                        asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                    }
+                });
+            }
+            return;
         }
 
         dispatcher.clearDelayedMessages().whenComplete((__, ex) -> {
@@ -1186,9 +1193,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             } else {
                 asyncDeleteCursor(subscriptionName, unsubscribeFuture);
             }
-            if (temporaryDispatcher != null) {
-                temporaryDispatcher.close();
-            }
         });
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 5480a2e7a70..0a82b2b4c3c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -305,4 +305,52 @@ public class BucketDelayedDeliveryTest extends DelayedDeliveryTest {
         assertTrue(namespaceMetric.isPresent());
         assertEquals(6, namespaceMetric.get().value);
     }
+
+    @Test
+    public void testDelete() throws Exception {
+        String topic = BrokerTestUtil.newUniqueName("persistent://public/default/testDelete");
+
+        @Cleanup
+        Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 1000; i++) {
+            producer.newMessage()
+                    .value("msg")
+                    .deliverAfter(1, TimeUnit.HOURS)
+                    .send();
+        }
+
+        Dispatcher dispatcher = pulsar.getBrokerService().getTopicReference(topic).get().getSubscription("sub").getDispatcher();
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1000));
+
+        Map<String, String> cursorProperties =
+                ((PersistentDispatcherMultipleConsumers) dispatcher).getCursor().getCursorProperties();
+        List<Long> bucketIds = cursorProperties.entrySet().stream()
+                .filter(x -> x.getKey().startsWith(CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket")).map(
+                        x -> Long.valueOf(x.getValue())).toList();
+
+        assertTrue(bucketIds.size() > 0);
+
+        admin.topics().delete(topic, true);
+
+        for (Long bucketId : bucketIds) {
+            try {
+                LedgerHandle ledgerHandle =
+                        pulsarTestContext.getBookKeeperClient()
+                                .openLedger(bucketId, BookKeeper.DigestType.CRC32C, new byte[]{});
+                Assert.fail("Should fail");
+            } catch (BKException.BKNoSuchLedgerExistsException e) {
+                // ignore it
+            }
+        }
+    }
 }