You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 07:06:20 UTC
[pulsar] branch master updated: [fix][broker] Fix MessageDeduplication#inactiveProducers may not be persistence correctly (#15206)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8e1ca487c10 [fix][broker] Fix MessageDeduplication#inactiveProducers may not be persistence correctly (#15206)
8e1ca487c10 is described below
commit 8e1ca487c1026510fee264d65a34067ac427ee9d
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Apr 28 15:06:11 2022 +0800
[fix][broker] Fix MessageDeduplication#inactiveProducers may not be persistence correctly (#15206)
### Motivation
#15204
In the current implementation, When the first time execute `purgeInactiveProducers`, Although the produces does not expire, it removed directly from the collection(464 line). The will result in these producers never being remove.
https://github.com/apache/pulsar/blob/9861dfb1208c4b6b8a1f17ef026e9af71c3e784c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java#L454-L472
### Modifications
1. It is removed from the collection only when the producer is inactive.
2. Take a snapshot after each removal of an inactive producer. When `managedLedger.getLastConfirmedEntry` equals `managedCursor.getMarkDeletedPosition()`, The`deduplication-snapshot-monitor` thread does not trigger a snapshot. The persistence these producers only the next time a message is produced, The can be confusing for users.
```
PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry();
if (position == null) {
return;
}
PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) {
return;
}
```
---
.../service/persistent/MessageDeduplication.java | 13 +++++++---
.../service/persistent/MessageDuplicationTest.java | 29 ++++++++++++++--------
2 files changed, 27 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 90ee3b67e3c..761a8a65d2a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -35,6 +35,7 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic.PublishContext;
@@ -401,7 +402,7 @@ public class MessageDeduplication {
}
}
- private void takeSnapshot(PositionImpl position) {
+ private void takeSnapshot(Position position) {
if (log.isDebugEnabled()) {
log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
}
@@ -412,7 +413,7 @@ public class MessageDeduplication {
}
});
- managedCursor.asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {
+ getManagedCursor().asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
@@ -456,19 +457,23 @@ public class MessageDeduplication {
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+ boolean hasInactive = false;
while (mapIterator.hasNext()) {
java.util.Map.Entry<String, Long> entry = mapIterator.next();
String producerName = entry.getKey();
long lastActiveTimestamp = entry.getValue();
- mapIterator.remove();
-
if (lastActiveTimestamp < minimumActiveTimestamp) {
log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
+ mapIterator.remove();
highestSequencedPushed.remove(producerName);
highestSequencedPersisted.remove(producerName);
+ hasInactive = true;
}
}
+ if (hasInactive) {
+ takeSnapshot(getManagedCursor().getMarkDeletedPosition());
+ }
}
public long getLastPublishedSequenceId(String producerName) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index f62a65ad36a..765d7463f98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -32,12 +32,12 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -167,11 +167,14 @@ public class MessageDuplicationTest {
MessageDeduplication messageDeduplication = spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, topic, managedLedger);
doReturn(true).when(messageDeduplication).isEnabled();
+ ManagedCursor managedCursor = mock(ManagedCursor.class);
+ doReturn(managedCursor).when(messageDeduplication).getManagedCursor();
+
Topic.PublishContext publishContext = mock(Topic.PublishContext.class);
Field field = MessageDeduplication.class.getDeclaredField("inactiveProducers");
field.setAccessible(true);
- Map<String, Long> map = (Map<String, Long>) field.get(messageDeduplication);
+ Map<String, Long> inactiveProducers = (Map<String, Long>) field.get(messageDeduplication);
String producerName1 = "test1";
when(publishContext.getHighestSequenceId()).thenReturn(2L);
@@ -187,18 +190,23 @@ public class MessageDuplicationTest {
when(publishContext.getProducerName()).thenReturn(producerName3);
messageDeduplication.isDuplicate(publishContext, null);
+ // All 3 are added to the inactiveProducers list
messageDeduplication.producerRemoved(producerName1);
- assertTrue(map.containsKey(producerName1));
- messageDeduplication.producerAdded(producerName1);
- assertFalse(map.containsKey(producerName1));
+ messageDeduplication.producerRemoved(producerName2);
+ messageDeduplication.producerRemoved(producerName3);
+
+ // Try first purgeInactive, all producer not inactive.
messageDeduplication.purgeInactiveProducers();
+ assertEquals(inactiveProducers.size(), 3);
+
+ // Modify the inactive time of produce2 and produce3
// messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
- map.put(producerName2, System.currentTimeMillis() - 70000);
- map.put(producerName3, System.currentTimeMillis() - 70000);
+ inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);
+ inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000);
+ // Try second purgeInactive, produce2 and produce3 is inactive.
messageDeduplication.purgeInactiveProducers();
- assertFalse(map.containsKey(producerName2));
- assertFalse(map.containsKey(producerName3));
-
+ assertFalse(inactiveProducers.containsKey(producerName2));
+ assertFalse(inactiveProducers.containsKey(producerName3));
field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
field.setAccessible(true);
ConcurrentOpenHashMap<String, Long> highestSequencedPushed = (ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication);
@@ -206,7 +214,6 @@ public class MessageDuplicationTest {
assertEquals((long) highestSequencedPushed.get(producerName1), 2L);
assertFalse(highestSequencedPushed.containsKey(producerName2));
assertFalse(highestSequencedPushed.containsKey(producerName3));
-
}
@Test