You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 11:29:42 UTC
[pulsar] branch branch-2.8 updated: [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new e12bcc2 [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820)
e12bcc2 is described below
commit e12bcc288fe2cf5c2b093a4acfbb5128af90c245
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Mon Nov 15 15:02:36 2021 -0600
[Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820)
(cherry picked from commit 9994614173205abd075fcc670396cebd71227047)
---
.../apache/pulsar/broker/service/persistent/MessageDeduplication.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 1c25de0..b6f5146 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -141,7 +141,7 @@ public class MessageDeduplication {
private CompletableFuture<Void> recoverSequenceIdsMap() {
// Load the sequence ids from the snapshot in the cursor properties
managedCursor.getProperties().forEach((k, v) -> {
- inactiveProducers.put(k, System.currentTimeMillis());
+ producerRemoved(k);
highestSequencedPushed.put(k, v);
highestSequencedPersisted.put(k, v);
});
@@ -172,7 +172,7 @@ public class MessageDeduplication {
long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId());
highestSequencedPushed.put(producerName, sequenceId);
highestSequencedPersisted.put(producerName, sequenceId);
- inactiveProducers.put(producerName, System.currentTimeMillis());
+ producerRemoved(producerName);
entry.release();
}