You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 11:29:42 UTC

[pulsar] branch branch-2.8 updated: [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new e12bcc2  [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820)
e12bcc2 is described below

commit e12bcc288fe2cf5c2b093a4acfbb5128af90c245
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Mon Nov 15 15:02:36 2021 -0600

    [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820)
    
    (cherry picked from commit 9994614173205abd075fcc670396cebd71227047)
---
 .../apache/pulsar/broker/service/persistent/MessageDeduplication.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 1c25de0..b6f5146 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -141,7 +141,7 @@ public class MessageDeduplication {
     private CompletableFuture<Void> recoverSequenceIdsMap() {
         // Load the sequence ids from the snapshot in the cursor properties
         managedCursor.getProperties().forEach((k, v) -> {
-            inactiveProducers.put(k, System.currentTimeMillis());
+            producerRemoved(k);
             highestSequencedPushed.put(k, v);
             highestSequencedPersisted.put(k, v);
         });
@@ -172,7 +172,7 @@ public class MessageDeduplication {
                     long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId());
                     highestSequencedPushed.put(producerName, sequenceId);
                     highestSequencedPersisted.put(producerName, sequenceId);
-                    inactiveProducers.put(producerName, System.currentTimeMillis());
+                    producerRemoved(producerName);
 
                     entry.release();
                 }