You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 11:37:33 UTC

[pulsar] branch branch-2.9 updated (b55e740 -> b87bd78)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from b55e740  [Java Client] Fix producer data race to get cnx (#13176)
     new 87933e9  [Broker] Fix messageDedup delete inactive producer name (#12493)
     new b87bd78  [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../service/persistent/MessageDeduplication.java   |  2 +
 .../service/persistent/MessageDuplicationTest.java | 67 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

[pulsar] 01/02: [Broker] Fix messageDedup delete inactive producer name (#12493)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 87933e9bed91adbf06b47471e51a8bf7e4b3a127
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Oct 27 10:01:39 2021 +0800

    [Broker] Fix messageDedup delete inactive producer name (#12493)
    
    ## Issue
    Now, remove inactive producerName in MessageDeduplication when producer close. But the producer has been closed before topic unload, this producerName will not be remove if producer don't connect broker with the same producerName.
    
    ## implement
    
    When topic recover `MessageDeduplication`, we should put every producerName into inactive producerNameMap. When producer with the same name, we will remove it from the inactive map, if this producerName can not connect within brokerDeduplicationProducerInactivityTimeoutMinutes, we can remove it.
    
    (cherry picked from commit 928924b5a37dbab8eea791200d328739d72da016)
---
 .../service/persistent/MessageDeduplication.java   |  2 +
 .../service/persistent/MessageDuplicationTest.java | 67 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 0df50cb..b2c42b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -138,6 +138,7 @@ public class MessageDeduplication {
     private CompletableFuture<Void> recoverSequenceIdsMap() {
         // Load the sequence ids from the snapshot in the cursor properties
         managedCursor.getProperties().forEach((k, v) -> {
+            inactiveProducers.put(k, System.currentTimeMillis());
             highestSequencedPushed.put(k, v);
             highestSequencedPersisted.put(k, v);
         });
@@ -168,6 +169,7 @@ public class MessageDeduplication {
                     long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId());
                     highestSequencedPushed.put(producerName, sequenceId);
                     highestSequencedPersisted.put(producerName, sequenceId);
+                    inactiveProducers.put(producerName, System.currentTimeMillis());
 
                     entry.release();
                 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index bfef2b7..4dc7f7f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -30,7 +30,13 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
 import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -40,8 +46,11 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 @Slf4j
 @Test(groups = "broker")
@@ -143,6 +152,64 @@ public class MessageDuplicationTest {
     }
 
     @Test
+    public void testInactiveProducerRemove() throws Exception {
+        PulsarService pulsarService = mock(PulsarService.class);
+        PersistentTopic topic = mock(PersistentTopic.class);
+        ManagedLedger managedLedger = mock(ManagedLedger.class);
+
+        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+        serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL);
+        serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS);
+        serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
+        serviceConfiguration.setBrokerDeduplicationProducerInactivityTimeoutMinutes(1);
+
+        doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
+        MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, topic, managedLedger));
+        doReturn(true).when(messageDeduplication).isEnabled();
+
+        Topic.PublishContext publishContext = mock(Topic.PublishContext.class);
+
+        Field field = MessageDeduplication.class.getDeclaredField("inactiveProducers");
+        field.setAccessible(true);
+        Map<String, Long> map = (Map<String, Long>) field.get(messageDeduplication);
+
+        String producerName1 = "test1";
+        when(publishContext.getHighestSequenceId()).thenReturn(2L);
+        when(publishContext.getSequenceId()).thenReturn(1L);
+        when(publishContext.getProducerName()).thenReturn(producerName1);
+        messageDeduplication.isDuplicate(publishContext, null);
+
+        String producerName2 = "test2";
+        when(publishContext.getProducerName()).thenReturn(producerName2);
+        messageDeduplication.isDuplicate(publishContext, null);
+
+        String producerName3 = "test3";
+        when(publishContext.getProducerName()).thenReturn(producerName3);
+        messageDeduplication.isDuplicate(publishContext, null);
+
+        messageDeduplication.producerRemoved(producerName1);
+        assertTrue(map.containsKey(producerName1));
+        messageDeduplication.producerAdded(producerName1);
+        assertFalse(map.containsKey(producerName1));
+        messageDeduplication.purgeInactiveProducers();
+        // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
+        map.put(producerName2, System.currentTimeMillis() - 70000);
+        map.put(producerName3, System.currentTimeMillis() - 70000);
+        messageDeduplication.purgeInactiveProducers();
+        assertFalse(map.containsKey(producerName2));
+        assertFalse(map.containsKey(producerName3));
+
+        field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
+        field.setAccessible(true);
+        ConcurrentOpenHashMap<String, Long> highestSequencedPushed = (ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication);
+
+        assertEquals((long) highestSequencedPushed.get(producerName1), 2L);
+        assertFalse(highestSequencedPushed.containsKey(producerName2));
+        assertFalse(highestSequencedPushed.containsKey(producerName3));
+
+    }
+
+    @Test
     public void testIsDuplicateWithFailure() {
 
         PulsarService pulsarService = mock(PulsarService.class);

[pulsar] 02/02: [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b87bd78683c5bdba1c430151592fcbb710b5bdbe
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Mon Nov 15 15:02:36 2021 -0600

    [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820)
    
    (cherry picked from commit 9994614173205abd075fcc670396cebd71227047)
---
 .../apache/pulsar/broker/service/persistent/MessageDeduplication.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index b2c42b0..e2436bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -138,7 +138,7 @@ public class MessageDeduplication {
     private CompletableFuture<Void> recoverSequenceIdsMap() {
         // Load the sequence ids from the snapshot in the cursor properties
         managedCursor.getProperties().forEach((k, v) -> {
-            inactiveProducers.put(k, System.currentTimeMillis());
+            producerRemoved(k);
             highestSequencedPushed.put(k, v);
             highestSequencedPersisted.put(k, v);
         });
@@ -169,7 +169,7 @@ public class MessageDeduplication {
                     long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId());
                     highestSequencedPushed.put(producerName, sequenceId);
                     highestSequencedPersisted.put(producerName, sequenceId);
-                    inactiveProducers.put(producerName, System.currentTimeMillis());
+                    producerRemoved(producerName);
 
                     entry.release();
                 }