You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:40 UTC
[pulsar] 10/17: Fix NPE in MessageDeduplication. (#15820)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b4c704e1f6c2f8a93b702700d4a53747da4c5fae
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jun 1 11:09:01 2022 +0800
Fix NPE in MessageDeduplication. (#15820)
(cherry picked from commit 01d7bfa681b23d1a236b1411b83e854c9ad9323f)
---
.../pulsar/broker/service/persistent/MessageDeduplication.java | 2 +-
.../pulsar/broker/service/persistent/MessageDuplicationTest.java | 7 +++++++
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 761a8a65d2a..486f3d8540d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -471,7 +471,7 @@ public class MessageDeduplication {
hasInactive = true;
}
}
- if (hasInactive) {
+ if (hasInactive && isEnabled()) {
takeSnapshot(getManagedCursor().getMarkDeletedPosition());
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 765d7463f98..8b379928a83 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -199,6 +199,13 @@ public class MessageDuplicationTest {
messageDeduplication.purgeInactiveProducers();
assertEquals(inactiveProducers.size(), 3);
+ doReturn(false).when(messageDeduplication).isEnabled();
+ inactiveProducers.put(producerName2, System.currentTimeMillis() - 80000);
+ inactiveProducers.put(producerName3, System.currentTimeMillis() - 80000);
+ messageDeduplication.purgeInactiveProducers();
+ assertFalse(inactiveProducers.containsKey(producerName2));
+ assertFalse(inactiveProducers.containsKey(producerName3));
+ doReturn(true).when(messageDeduplication).isEnabled();
// Modify the inactive time of produce2 and produce3
// messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);