You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2022/06/07 01:54:46 UTC

[pulsar] 04/05: Fix NPE in MessageDeduplication. (#15820)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 532aa85e0d8ff78b1c99485d54dd7de224a7a41f
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jun 1 11:09:01 2022 +0800

    Fix NPE in MessageDeduplication. (#15820)
    
    (cherry picked from commit 01d7bfa681b23d1a236b1411b83e854c9ad9323f)
---
 .../pulsar/broker/service/persistent/MessageDeduplication.java     | 2 +-
 .../pulsar/broker/service/persistent/MessageDuplicationTest.java   | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 5d0d8f8b1ea..7dd2ca4ba99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -493,7 +493,7 @@ public class MessageDeduplication {
                 hasInactive = true;
             }
         }
-        if (hasInactive) {
+        if (hasInactive && isEnabled()) {
             takeSnapshot(getManagedCursor().getMarkDeletedPosition());
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 5c2598ceac2..c324e13da91 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -199,6 +199,13 @@ public class MessageDuplicationTest {
         messageDeduplication.purgeInactiveProducers();
         assertEquals(inactiveProducers.size(), 3);
 
+        doReturn(false).when(messageDeduplication).isEnabled();
+        inactiveProducers.put(producerName2, System.currentTimeMillis() - 80000);
+        inactiveProducers.put(producerName3, System.currentTimeMillis() - 80000);
+        messageDeduplication.purgeInactiveProducers();
+        assertFalse(inactiveProducers.containsKey(producerName2));
+        assertFalse(inactiveProducers.containsKey(producerName3));
+        doReturn(true).when(messageDeduplication).isEnabled();
         // Modify the inactive time of produce2 and produce3
         // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
         inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);