You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/10/19 22:13:17 UTC

[pulsar] branch master updated: Fix message TTL on Key_Shared subscription and Fix ordering issue when replay messages. (#8292)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 52e441f  Fix message TTL on Key_Shared subscription and Fix ordering issue when replay messages. (#8292)
52e441f is described below

commit 52e441f29d6bf50cee051c45d7686a7d57147770
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Oct 20 06:12:41 2020 +0800

    Fix message TTL on Key_Shared subscription and Fix ordering issue when replay messages. (#8292)
---
 .../persistent/PersistentMessageExpiryMonitor.java | 18 +++--
 .../service/persistent/PersistentReplicator.java   |  6 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 41 ++++++++---
 .../service/persistent/PersistentSubscription.java |  2 +-
 .../service/PersistentMessageFinderTest.java       |  4 +-
 .../client/api/KeySharedSubscriptionTest.java      | 80 ++++++++++++++++++++++
 6 files changed, 129 insertions(+), 22 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index b888f10..02d5a52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.stats.Rate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
     private final String topicName;
     private final Rate msgExpired;
     private final boolean autoSkipNonRecoverableData;
+    private final PersistentSubscription subscription;
 
     private static final int FALSE = 0;
     private static final int TRUE = 1;
@@ -48,14 +50,15 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
     private static final AtomicIntegerFieldUpdater<PersistentMessageExpiryMonitor> expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater
             .newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress");
 
-    public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor) {
+    public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor, PersistentSubscription subscription) {
         this.topicName = topicName;
         this.cursor = cursor;
         this.subName = subscriptionName;
+        this.subscription = subscription;
         this.msgExpired = new Rate();
-        this.autoSkipNonRecoverableData = cursor.getManagedLedger() != null  // check to avoid test failures
-                ? cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
-                : false;
+        // check to avoid test failures
+        this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null
+                && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
     }
 
     public void expireMessages(int messageTTLInSeconds) {
@@ -64,7 +67,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
                     messageTTLInSeconds);
 
             cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
-                MessageImpl msg = null;
+                MessageImpl<?> msg = null;
                 try {
                     msg = MessageImpl.deserialize(entry.getDataBuffer());
                     return msg.isExpired(messageTTLInSeconds);
@@ -102,7 +105,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
             long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false);
             msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */);
             updateRates();
-
+            // If the subscription is a Key_Shared subscription, we should to trigger message dispatch.
+            if (subscription != null && subscription.getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared) {
+                subscription.getDispatcher().acknowledgementWasProcessed();
+            }
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Mark deleted {} messages", topicName, subName, numMessagesExpired);
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index ec326b9..35f8199 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -113,7 +113,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
         this.ledger = cursor.getManagedLedger();
         this.cursor = cursor;
         this.topic = topic;
-        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
+        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null);
         HAVE_PENDING_READ_UPDATER.set(this, FALSE);
         PENDING_MESSAGES_UPDATER.set(this, 0);
 
@@ -196,7 +196,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
         if (cursor != null) {
             log.info("[{}][{} -> {}] Using the exists cursor for replicator", topicName, localCluster, remoteCluster);
             if (expiryMonitor == null) {
-                this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
+                this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null);
             }
             return CompletableFuture.completedFuture(null);
         }
@@ -206,7 +206,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 log.info("[{}][{} -> {}] Open cursor succeed for replicator", topicName, localCluster, remoteCluster);
                 PersistentReplicator.this.cursor = cursor;
-                PersistentReplicator.this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
+                PersistentReplicator.this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null);
                 res.complete(null);
             }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 1585cae..c7d08eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,14 +18,13 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import com.google.common.base.Preconditions;
-
 import io.netty.util.concurrent.FastThreadLocal;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -63,7 +62,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
      * This means that, in order to preserve ordering, new consumers can only receive old
      * messages, until the mark-delete position will move past this point.
      */
-    private final Map<Consumer, PositionImpl> recentlyJoinedConsumers;
+    private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;
 
     private final Set<Consumer> stuckConsumers;
     private final Set<Consumer> nextStuckConsumers;
@@ -73,7 +72,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         super(topic, cursor, subscription);
 
         this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
-        this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? Collections.emptyMap() : new HashMap<>();
+        this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
         this.stuckConsumers = new HashSet<>();
         this.nextStuckConsumers = new HashSet<>();
 
@@ -112,6 +111,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         // If this was the 1st consumer, or if all the messages are already acked, then we
         // don't need to do anything special
         if (!allowOutOfOrderDelivery
+                && recentlyJoinedConsumers != null
                 && consumerList.size() > 1
                 && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
             recentlyJoinedConsumers.put(consumer, readPositionWhenJoining);
@@ -122,8 +122,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
     public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
         super.removeConsumer(consumer);
         selector.removeConsumer(consumer);
-
-        recentlyJoinedConsumers.remove(consumer);
+        if (recentlyJoinedConsumers != null) {
+            recentlyJoinedConsumers.remove(consumer);
+        }
     }
 
     private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() {
@@ -169,7 +170,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             int entriesWithSameKeyCount = entriesWithSameKey.size();
             final int availablePermits = Math.max(consumer.getAvailablePermits(), 0);
             int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
-            int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC);
+            int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, readType);
             if (log.isDebugEnabled()) {
                 log.debug("[{}] select consumer {} with messages num {}, read type is {}",
                         name, consumer.consumerName(), messagesForC, readType);
@@ -228,7 +229,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
         stuckConsumers.clear();
 
-        if (totalMessagesSent == 0 && recentlyJoinedConsumers.isEmpty()) {
+        if (totalMessagesSent == 0 && recentlyJoinedConsumers != null && recentlyJoinedConsumers.isEmpty()) {
             // This means, that all the messages we've just read cannot be dispatched right now.
             // This condition can only happen when:
             //  1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
@@ -251,13 +252,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         }
     }
 
-    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages) {
+    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages, ReadType readType) {
         if (maxMessages == 0) {
             // the consumer was stuck
             nextStuckConsumers.add(consumer);
             return 0;
         }
 
+        if (recentlyJoinedConsumers == null) {
+            return maxMessages;
+        }
+
         PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
         if (maxReadPosition == null) {
             // stop to dispatch by stuckConsumers
@@ -280,6 +285,22 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             return maxMessages;
         }
 
+        // If the read type is Replay, we should avoid send messages that hold by other consumer to the new consumers,
+        // For example, we have 10 messages [0,1,2,3,4,5,6,7,8,9]
+        // If the consumer0 get message 0 and 1, and does not acked message 0, then consumer1 joined,
+        // when consumer1 get message 2,3, the broker will not dispatch messages to consumer1
+        // because of the mark delete position did not move forward. So message 2,3 will stored in the redeliver tracker.
+        // Now, consumer2 joined, it will read new messages from the cursor, so the recentJoinedPosition is 4 for consumer2
+        // Because of there are messages need to redeliver, so the broker will read the redelivery message first [2,3]
+        // message [2,3] is lower than the recentJoinedPosition 4, so the message [2,3] will dispatched to the consumer2
+        // But the message [2,3] should not dispatch to consumer2.
+
+        if (readType == ReadType.Replay) {
+            PositionImpl minReadPositionForRecentJoinedConsumer = recentlyJoinedConsumers.values().iterator().next();
+            if (minReadPositionForRecentJoinedConsumer != null && minReadPositionForRecentJoinedConsumer.compareTo(maxReadPosition) < 0) {
+                maxReadPosition = minReadPositionForRecentJoinedConsumer;
+            }
+        }
         // Here, the consumer is one that has recently joined, so we can only send messages that were
         // published before it has joined.
         for (int i = 0; i < maxMessages; i++) {
@@ -295,7 +316,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     @Override
     public synchronized void acknowledgementWasProcessed() {
-        if (!recentlyJoinedConsumers.isEmpty()) {
+        if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()) {
             // After we process acks, we need to check whether the mark-delete position was advanced and we can finally
             // read more messages. It's safe to call readMoreEntries() multiple times.
             readMoreEntries();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 20ba166..f6d6917 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -147,7 +147,7 @@ public class PersistentSubscription implements Subscription {
         this.topicName = topic.getName();
         this.subName = subscriptionName;
         this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
-        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
+        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
         IS_FENCED_UPDATER.set(this, FALSE);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index a393ae3..36ca8e8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -187,7 +187,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
                 });
         assertTrue(ex.get());
 
-        PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1);
+        PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
         monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
                 Optional.empty(), null);
         Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
@@ -237,7 +237,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
         bkc.deleteLedger(ledgers.get(1).getLedgerId());
         bkc.deleteLedger(ledgers.get(2).getLedgerId());
 
-        PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1);
+        PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
         Position previousMarkDelete = null;
         for (int i = 0; i < totalEntries; i++) {
             monitor.expireMessages(1);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 02073ea..0225382 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -775,6 +776,85 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
     }
 
+    @Test
+    public void testContinueDispatchMessagesWhenMessageTTL() throws Exception {
+        int defaultTTLSec = 3;
+        int totalMessages = 1000;
+        this.conf.setTtlDurationDefaultInSeconds(defaultTTLSec);
+        final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
+        final String subName = "my-sub";
+
+        @Cleanup
+        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < totalMessages; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(i)
+                    .send();
+        }
+
+        // don't ack the first message
+        consumer1.receive();
+        consumer1.acknowledge(consumer1.receive());
+
+        // The consumer1 and consumer2 should be stucked because of the mark delete position did not move forward.
+
+        @Cleanup
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        Message<Integer> received = null;
+        try {
+            received = consumer2.receive(1, TimeUnit.SECONDS);
+        } catch (PulsarClientException ignore) {
+        }
+        Assert.assertNull(received);
+
+        @Cleanup
+        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        try {
+            received = consumer3.receive(1, TimeUnit.SECONDS);
+        } catch (PulsarClientException ignore) {
+        }
+        Assert.assertNull(received);
+
+        Optional<Topic> topicRef = pulsar.getBrokerService().getTopic(topic, false).get();
+        assertTrue(topicRef.isPresent());
+        Thread.sleep((defaultTTLSec - 1) * 1000);
+        topicRef.get().checkMessageExpiry();
+
+        // The mark delete position is move forward, so the consumers should receive new messages now.
+        for (int i = 0; i < totalMessages; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(i)
+                    .send();
+        }
+
+        // Wait broker dispatch messages.
+        Assert.assertNotNull(consumer2.receive(1, TimeUnit.SECONDS));
+        Assert.assertNotNull(consumer3.receive(1, TimeUnit.SECONDS));
+    }
+
     private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
         return pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)