You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 13:04:29 UTC

[pulsar] 25/26: [fix][broker] Fix MessageDeduplication#inactiveProducers may not be persistence correctly (#15206)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0f15d122fbda0e7e4f8c9dd0ef44f79bb0ba3b3f
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Apr 28 15:06:11 2022 +0800

    [fix][broker] Fix MessageDeduplication#inactiveProducers may not be persistence correctly (#15206)
    
    ### Motivation
    
    #15204
    
    In the current implementation, When the first time execute `purgeInactiveProducers`, Although the produces does not expire, it removed directly from the collection(464 line). The will result in these producers never being remove.
    
    https://github.com/apache/pulsar/blob/9861dfb1208c4b6b8a1f17ef026e9af71c3e784c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java#L454-L472
    
    ### Modifications
    
    1. It is removed from the collection only when the producer is inactive.
    2. Take a snapshot after each removal of an inactive producer. When `managedLedger.getLastConfirmedEntry` equals `managedCursor.getMarkDeletedPosition()`, The`deduplication-snapshot-monitor` thread does not trigger a snapshot. The persistence these producers only the next time a message is produced, The can be confusing for users.
    
    ```
            PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry();
            if (position == null) {
                return;
            }
            PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
            if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) {
                return;
            }
    ```
    
    (cherry picked from commit 8e1ca487c1026510fee264d65a34067ac427ee9d)
---
 .../service/persistent/MessageDeduplication.java   | 13 +++++++---
 .../service/persistent/MessageDuplicationTest.java | 29 ++++++++++++++--------
 2 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 90ee3b67e3c..761a8a65d2a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -35,6 +35,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic.PublishContext;
@@ -401,7 +402,7 @@ public class MessageDeduplication {
         }
     }
 
-    private void takeSnapshot(PositionImpl position) {
+    private void takeSnapshot(Position position) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
         }
@@ -412,7 +413,7 @@ public class MessageDeduplication {
             }
         });
 
-        managedCursor.asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {
+        getManagedCursor().asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
                 if (log.isDebugEnabled()) {
@@ -456,19 +457,23 @@ public class MessageDeduplication {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
             String producerName = entry.getKey();
             long lastActiveTimestamp = entry.getValue();
 
-            mapIterator.remove();
-
             if (lastActiveTimestamp < minimumActiveTimestamp) {
                 log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
+                mapIterator.remove();
                 highestSequencedPushed.remove(producerName);
                 highestSequencedPersisted.remove(producerName);
+                hasInactive = true;
             }
         }
+        if (hasInactive) {
+            takeSnapshot(getManagedCursor().getMarkDeletedPosition());
+        }
     }
 
     public long getLastPublishedSequenceId(String producerName) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index f62a65ad36a..765d7463f98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -32,12 +32,12 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import java.lang.reflect.Field;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -167,11 +167,14 @@ public class MessageDuplicationTest {
         MessageDeduplication messageDeduplication = spyWithClassAndConstructorArgs(MessageDeduplication.class, pulsarService, topic, managedLedger);
         doReturn(true).when(messageDeduplication).isEnabled();
 
+        ManagedCursor managedCursor = mock(ManagedCursor.class);
+        doReturn(managedCursor).when(messageDeduplication).getManagedCursor();
+
         Topic.PublishContext publishContext = mock(Topic.PublishContext.class);
 
         Field field = MessageDeduplication.class.getDeclaredField("inactiveProducers");
         field.setAccessible(true);
-        Map<String, Long> map = (Map<String, Long>) field.get(messageDeduplication);
+        Map<String, Long> inactiveProducers = (Map<String, Long>) field.get(messageDeduplication);
 
         String producerName1 = "test1";
         when(publishContext.getHighestSequenceId()).thenReturn(2L);
@@ -187,18 +190,23 @@ public class MessageDuplicationTest {
         when(publishContext.getProducerName()).thenReturn(producerName3);
         messageDeduplication.isDuplicate(publishContext, null);
 
+        // All 3 are added to the inactiveProducers list
         messageDeduplication.producerRemoved(producerName1);
-        assertTrue(map.containsKey(producerName1));
-        messageDeduplication.producerAdded(producerName1);
-        assertFalse(map.containsKey(producerName1));
+        messageDeduplication.producerRemoved(producerName2);
+        messageDeduplication.producerRemoved(producerName3);
+
+        // Try first purgeInactive, all producer not inactive.
         messageDeduplication.purgeInactiveProducers();
+        assertEquals(inactiveProducers.size(), 3);
+
+        // Modify the inactive time of produce2 and produce3
         // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
-        map.put(producerName2, System.currentTimeMillis() - 70000);
-        map.put(producerName3, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000);
+        // Try second purgeInactive, produce2 and produce3 is inactive.
         messageDeduplication.purgeInactiveProducers();
-        assertFalse(map.containsKey(producerName2));
-        assertFalse(map.containsKey(producerName3));
-
+        assertFalse(inactiveProducers.containsKey(producerName2));
+        assertFalse(inactiveProducers.containsKey(producerName3));
         field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
         field.setAccessible(true);
         ConcurrentOpenHashMap<String, Long> highestSequencedPushed = (ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication);
@@ -206,7 +214,6 @@ public class MessageDuplicationTest {
         assertEquals((long) highestSequencedPushed.get(producerName1), 2L);
         assertFalse(highestSequencedPushed.containsKey(producerName2));
         assertFalse(highestSequencedPushed.containsKey(producerName3));
-
     }
 
     @Test