You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/07/27 03:13:49 UTC

[pulsar] branch branch-2.7 updated: [fix][broker] Fix MessageDeduplication#inactiveProducers may not be persistence correctly (#15206)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new a8694d75157 [fix][broker] Fix MessageDeduplication#inactiveProducers may not be persistence correctly (#15206)
a8694d75157 is described below

commit a8694d751578227b33273f2fb2e84f16108ac91b
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Apr 28 15:06:11 2022 +0800

    [fix][broker] Fix MessageDeduplication#inactiveProducers may not be persistence correctly (#15206)
    
    ### Motivation
    
    #15204
    
    In the current implementation, When the first time execute `purgeInactiveProducers`, Although the produces does not expire, it removed directly from the collection(464 line). The will result in these producers never being remove.
    
    https://github.com/apache/pulsar/blob/9861dfb1208c4b6b8a1f17ef026e9af71c3e784c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java#L454-L472
    
    ### Modifications
    
    1. It is removed from the collection only when the producer is inactive.
    2. Take a snapshot after each removal of an inactive producer. When `managedLedger.getLastConfirmedEntry` equals `managedCursor.getMarkDeletedPosition()`, The`deduplication-snapshot-monitor` thread does not trigger a snapshot. The persistence these producers only the next time a message is produced, The can be confusing for users.
    
    ```
            PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry();
            if (position == null) {
                return;
            }
            PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
            if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) {
                return;
            }
    ```
    
    (cherry picked from commit 8e1ca487c1026510fee264d65a34067ac427ee9d)
---
 .../service/persistent/MessageDeduplication.java   | 13 +++++++---
 .../service/persistent/MessageDuplicationTest.java | 29 ++++++++++++++--------
 2 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 12c6bccfecf..2e5300b2678 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -402,7 +403,7 @@ public class MessageDeduplication {
         }
     }
 
-    private void takeSnapshot(PositionImpl position) {
+    private void takeSnapshot(Position position) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
         }
@@ -413,7 +414,7 @@ public class MessageDeduplication {
             }
         });
 
-        managedCursor.asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {
+        getManagedCursor().asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {
             @Override
             public void markDeleteComplete(Object ctx) {
                 if (log.isDebugEnabled()) {
@@ -471,19 +472,23 @@ public class MessageDeduplication {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
             String producerName = entry.getKey();
             long lastActiveTimestamp = entry.getValue();
 
-            mapIterator.remove();
-
             if (lastActiveTimestamp < minimumActiveTimestamp) {
                 log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
+                mapIterator.remove();
                 highestSequencedPushed.remove(producerName);
                 highestSequencedPersisted.remove(producerName);
+                hasInactive = true;
             }
         }
+        if (hasInactive) {
+            takeSnapshot(getManagedCursor().getMarkDeletedPosition());
+        }
     }
 
     public long getLastPublishedSequenceId(String producerName) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index e172ac27115..592c01b7faf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -31,12 +31,12 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -165,11 +165,14 @@ public class MessageDuplicationTest {
         MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, topic, managedLedger));
         doReturn(true).when(messageDeduplication).isEnabled();
 
+        ManagedCursor managedCursor = mock(ManagedCursor.class);
+        doReturn(managedCursor).when(messageDeduplication).getManagedCursor();
+
         Topic.PublishContext publishContext = mock(Topic.PublishContext.class);
 
         Field field = MessageDeduplication.class.getDeclaredField("inactiveProducers");
         field.setAccessible(true);
-        Map<String, Long> map = (Map<String, Long>) field.get(messageDeduplication);
+        Map<String, Long> inactiveProducers = (Map<String, Long>) field.get(messageDeduplication);
 
         String producerName1 = "test1";
         when(publishContext.getHighestSequenceId()).thenReturn(2L);
@@ -185,18 +188,23 @@ public class MessageDuplicationTest {
         when(publishContext.getProducerName()).thenReturn(producerName3);
         messageDeduplication.isDuplicate(publishContext, null);
 
+        // All 3 are added to the inactiveProducers list
         messageDeduplication.producerRemoved(producerName1);
-        assertTrue(map.containsKey(producerName1));
-        messageDeduplication.producerAdded(producerName1);
-        assertFalse(map.containsKey(producerName1));
+        messageDeduplication.producerRemoved(producerName2);
+        messageDeduplication.producerRemoved(producerName3);
+
+        // Try first purgeInactive, all producer not inactive.
         messageDeduplication.purgeInactiveProducers();
+        assertEquals(inactiveProducers.size(), 3);
+
+        // Modify the inactive time of produce2 and produce3
         // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
-        map.put(producerName2, System.currentTimeMillis() - 70000);
-        map.put(producerName3, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000);
+        // Try second purgeInactive, produce2 and produce3 is inactive.
         messageDeduplication.purgeInactiveProducers();
-        assertFalse(map.containsKey(producerName2));
-        assertFalse(map.containsKey(producerName3));
-
+        assertFalse(inactiveProducers.containsKey(producerName2));
+        assertFalse(inactiveProducers.containsKey(producerName3));
         field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
         field.setAccessible(true);
         ConcurrentOpenHashMap<String, Long> highestSequencedPushed = (ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication);
@@ -204,7 +212,6 @@ public class MessageDuplicationTest {
         assertEquals((long) highestSequencedPushed.get(producerName1), 2L);
         assertFalse(highestSequencedPushed.containsKey(producerName2));
         assertFalse(highestSequencedPushed.containsKey(producerName3));
-
     }
 
     @Test