You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/10/19 22:13:17 UTC
[pulsar] branch master updated: Fix message TTL on Key_Shared
subscription and Fix ordering issue when replay messages. (#8292)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 52e441f Fix message TTL on Key_Shared subscription and Fix ordering issue when replay messages. (#8292)
52e441f is described below
commit 52e441f29d6bf50cee051c45d7686a7d57147770
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Oct 20 06:12:41 2020 +0800
Fix message TTL on Key_Shared subscription and Fix ordering issue when replay messages. (#8292)
---
.../persistent/PersistentMessageExpiryMonitor.java | 18 +++--
.../service/persistent/PersistentReplicator.java | 6 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 41 ++++++++---
.../service/persistent/PersistentSubscription.java | 2 +-
.../service/PersistentMessageFinderTest.java | 4 +-
.../client/api/KeySharedSubscriptionTest.java | 80 ++++++++++++++++++++++
6 files changed, 129 insertions(+), 22 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index b888f10..02d5a52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.stats.Rate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private final String topicName;
private final Rate msgExpired;
private final boolean autoSkipNonRecoverableData;
+ private final PersistentSubscription subscription;
private static final int FALSE = 0;
private static final int TRUE = 1;
@@ -48,14 +50,15 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private static final AtomicIntegerFieldUpdater<PersistentMessageExpiryMonitor> expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater
.newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress");
- public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor) {
+ public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor, PersistentSubscription subscription) {
this.topicName = topicName;
this.cursor = cursor;
this.subName = subscriptionName;
+ this.subscription = subscription;
this.msgExpired = new Rate();
- this.autoSkipNonRecoverableData = cursor.getManagedLedger() != null // check to avoid test failures
- ? cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
- : false;
+ // check to avoid test failures
+ this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null
+ && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
}
public void expireMessages(int messageTTLInSeconds) {
@@ -64,7 +67,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
messageTTLInSeconds);
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
- MessageImpl msg = null;
+ MessageImpl<?> msg = null;
try {
msg = MessageImpl.deserialize(entry.getDataBuffer());
return msg.isExpired(messageTTLInSeconds);
@@ -102,7 +105,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false);
msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */);
updateRates();
-
+ // If the subscription is a Key_Shared subscription, we should to trigger message dispatch.
+ if (subscription != null && subscription.getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared) {
+ subscription.getDispatcher().acknowledgementWasProcessed();
+ }
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Mark deleted {} messages", topicName, subName, numMessagesExpired);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index ec326b9..35f8199 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -113,7 +113,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
this.ledger = cursor.getManagedLedger();
this.cursor = cursor;
this.topic = topic;
- this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
+ this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);
@@ -196,7 +196,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
if (cursor != null) {
log.info("[{}][{} -> {}] Using the exists cursor for replicator", topicName, localCluster, remoteCluster);
if (expiryMonitor == null) {
- this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
+ this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null);
}
return CompletableFuture.completedFuture(null);
}
@@ -206,7 +206,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
log.info("[{}][{} -> {}] Open cursor succeed for replicator", topicName, localCluster, remoteCluster);
PersistentReplicator.this.cursor = cursor;
- PersistentReplicator.this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
+ PersistentReplicator.this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null);
res.complete(null);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 1585cae..c7d08eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,14 +18,13 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import com.google.common.base.Preconditions;
-
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -63,7 +62,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
* This means that, in order to preserve ordering, new consumers can only receive old
* messages, until the mark-delete position will move past this point.
*/
- private final Map<Consumer, PositionImpl> recentlyJoinedConsumers;
+ private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;
private final Set<Consumer> stuckConsumers;
private final Set<Consumer> nextStuckConsumers;
@@ -73,7 +72,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
super(topic, cursor, subscription);
this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
- this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? Collections.emptyMap() : new HashMap<>();
+ this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
this.stuckConsumers = new HashSet<>();
this.nextStuckConsumers = new HashSet<>();
@@ -112,6 +111,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// If this was the 1st consumer, or if all the messages are already acked, then we
// don't need to do anything special
if (!allowOutOfOrderDelivery
+ && recentlyJoinedConsumers != null
&& consumerList.size() > 1
&& cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
recentlyJoinedConsumers.put(consumer, readPositionWhenJoining);
@@ -122,8 +122,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
super.removeConsumer(consumer);
selector.removeConsumer(consumer);
-
- recentlyJoinedConsumers.remove(consumer);
+ if (recentlyJoinedConsumers != null) {
+ recentlyJoinedConsumers.remove(consumer);
+ }
}
private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() {
@@ -169,7 +170,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
int entriesWithSameKeyCount = entriesWithSameKey.size();
final int availablePermits = Math.max(consumer.getAvailablePermits(), 0);
int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
- int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC);
+ int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, readType);
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} with messages num {}, read type is {}",
name, consumer.consumerName(), messagesForC, readType);
@@ -228,7 +229,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
stuckConsumers.clear();
- if (totalMessagesSent == 0 && recentlyJoinedConsumers.isEmpty()) {
+ if (totalMessagesSent == 0 && recentlyJoinedConsumers != null && recentlyJoinedConsumers.isEmpty()) {
// This means, that all the messages we've just read cannot be dispatched right now.
// This condition can only happen when:
// 1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
@@ -251,13 +252,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
}
- private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages) {
+ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages, ReadType readType) {
if (maxMessages == 0) {
// the consumer was stuck
nextStuckConsumers.add(consumer);
return 0;
}
+ if (recentlyJoinedConsumers == null) {
+ return maxMessages;
+ }
+
PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
if (maxReadPosition == null) {
// stop to dispatch by stuckConsumers
@@ -280,6 +285,22 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return maxMessages;
}
+ // If the read type is Replay, we should avoid send messages that hold by other consumer to the new consumers,
+ // For example, we have 10 messages [0,1,2,3,4,5,6,7,8,9]
+ // If the consumer0 get message 0 and 1, and does not acked message 0, then consumer1 joined,
+ // when consumer1 get message 2,3, the broker will not dispatch messages to consumer1
+ // because of the mark delete position did not move forward. So message 2,3 will stored in the redeliver tracker.
+ // Now, consumer2 joined, it will read new messages from the cursor, so the recentJoinedPosition is 4 for consumer2
+ // Because of there are messages need to redeliver, so the broker will read the redelivery message first [2,3]
+ // message [2,3] is lower than the recentJoinedPosition 4, so the message [2,3] will dispatched to the consumer2
+ // But the message [2,3] should not dispatch to consumer2.
+
+ if (readType == ReadType.Replay) {
+ PositionImpl minReadPositionForRecentJoinedConsumer = recentlyJoinedConsumers.values().iterator().next();
+ if (minReadPositionForRecentJoinedConsumer != null && minReadPositionForRecentJoinedConsumer.compareTo(maxReadPosition) < 0) {
+ maxReadPosition = minReadPositionForRecentJoinedConsumer;
+ }
+ }
// Here, the consumer is one that has recently joined, so we can only send messages that were
// published before it has joined.
for (int i = 0; i < maxMessages; i++) {
@@ -295,7 +316,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
@Override
public synchronized void acknowledgementWasProcessed() {
- if (!recentlyJoinedConsumers.isEmpty()) {
+ if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()) {
// After we process acks, we need to check whether the mark-delete position was advanced and we can finally
// read more messages. It's safe to call readMoreEntries() multiple times.
readMoreEntries();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 20ba166..f6d6917 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -147,7 +147,7 @@ public class PersistentSubscription implements Subscription {
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
- this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
+ this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
this.setReplicated(replicated);
IS_FENCED_UPDATER.set(this, FALSE);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index a393ae3..36ca8e8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -187,7 +187,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
});
assertTrue(ex.get());
- PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1);
+ PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
Optional.empty(), null);
Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
@@ -237,7 +237,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
bkc.deleteLedger(ledgers.get(1).getLedgerId());
bkc.deleteLedger(ledgers.get(2).getLedgerId());
- PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1);
+ PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
Position previousMarkDelete = null;
for (int i = 0; i < totalEntries; i++) {
monitor.expireMessages(1);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 02073ea..0225382 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@@ -775,6 +776,85 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
}
+ @Test
+ public void testContinueDispatchMessagesWhenMessageTTL() throws Exception {
+ int defaultTTLSec = 3;
+ int totalMessages = 1000;
+ this.conf.setTtlDurationDefaultInSeconds(defaultTTLSec);
+ final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
+ final String subName = "my-sub";
+
+ @Cleanup
+ Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .create();
+
+ for (int i = 0; i < totalMessages; i++) {
+ producer.newMessage()
+ .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+ .value(i)
+ .send();
+ }
+
+ // don't ack the first message
+ consumer1.receive();
+ consumer1.acknowledge(consumer1.receive());
+
+ // The consumer1 and consumer2 should be stucked because of the mark delete position did not move forward.
+
+ @Cleanup
+ Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ Message<Integer> received = null;
+ try {
+ received = consumer2.receive(1, TimeUnit.SECONDS);
+ } catch (PulsarClientException ignore) {
+ }
+ Assert.assertNull(received);
+
+ @Cleanup
+ Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ try {
+ received = consumer3.receive(1, TimeUnit.SECONDS);
+ } catch (PulsarClientException ignore) {
+ }
+ Assert.assertNull(received);
+
+ Optional<Topic> topicRef = pulsar.getBrokerService().getTopic(topic, false).get();
+ assertTrue(topicRef.isPresent());
+ Thread.sleep((defaultTTLSec - 1) * 1000);
+ topicRef.get().checkMessageExpiry();
+
+ // The mark delete position is move forward, so the consumers should receive new messages now.
+ for (int i = 0; i < totalMessages; i++) {
+ producer.newMessage()
+ .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+ .value(i)
+ .send();
+ }
+
+ // Wait broker dispatch messages.
+ Assert.assertNotNull(consumer2.receive(1, TimeUnit.SECONDS));
+ Assert.assertNotNull(consumer3.receive(1, TimeUnit.SECONDS));
+ }
+
private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
return pulsarClient.newConsumer(Schema.STRING)
.topic(topic)