You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/18 16:43:08 UTC

[GitHub] [pulsar] shibd opened a new pull request, #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

shibd opened a new pull request, #15206:
URL: https://github.com/apache/pulsar/pull/15206

   ### Motivation
   
   #15204 
   
   In the current implementation, When the first time execute `purgeInactiveProducers`, Although the produces does not expire, it removed directly from the collection(464 line). The will result in these producers never being remove.
   
   https://github.com/apache/pulsar/blob/9861dfb1208c4b6b8a1f17ef026e9af71c3e784c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java#L454-L472
   
   ### Modifications
   
   1. It is removed from the collection only when the producer is inactive.
   2. Take a snapshot after each removal of an inactive producer. When `managedLedger.getLastConfirmedEntry` equals `managedCursor.getMarkDeletedPosition()`, The`deduplication-snapshot-monitor` thread does not trigger a snapshot. The persistence these producers only the next time a message is produced, The can be confusing for users.
   
   ```
           PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry();
           if (position == null) {
               return;
           }
           PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
           if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) {
               return;
           }
   ```
   
   
   
   ### Documentation
   - [x] `no-need-doc` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r852501207


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,22 @@ public synchronized void purgeInactiveProducers() {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;

Review Comment:
   Thanks, you are right. I missed a line. Fixed, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r852516257


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,23 @@ public synchronized void purgeInactiveProducers() {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
             String producerName = entry.getKey();
             long lastActiveTimestamp = entry.getValue();
 
-            mapIterator.remove();
-
             if (lastActiveTimestamp < minimumActiveTimestamp) {
                 log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
+                mapIterator.remove();
                 highestSequencedPushed.remove(producerName);
                 highestSequencedPersisted.remove(producerName);
+                hasInactive = true;
             }
         }
+        if (hasInactive) {
+            takeSnapshot(getManagedCursor().getMarkDeletedPosition());

Review Comment:
   If there are a large number of topics, snapshots may be created concurrently. We should avoid creating snapshots frequently. This is now done by the timer thread now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r852965699


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,23 @@ public synchronized void purgeInactiveProducers() {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
             String producerName = entry.getKey();
             long lastActiveTimestamp = entry.getValue();
 
-            mapIterator.remove();
-
             if (lastActiveTimestamp < minimumActiveTimestamp) {
                 log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
+                mapIterator.remove();
                 highestSequencedPushed.remove(producerName);
                 highestSequencedPersisted.remove(producerName);
+                hasInactive = true;
             }
         }
+        if (hasInactive) {
+            takeSnapshot(getManagedCursor().getMarkDeletedPosition());

Review Comment:
   `checkMessageDeduplicationInfo` is not a frequent operation, but it traverses all topics each time when it is executed. If many topics have Producers that need to be purged, a large number of snapshots will be generated.
   Currently, `takeSnapshot` is automatically executed every 120 seconds. Therefore, we do not need to generate a snapshot after each topic is purged. Or, wait until all topics are cleaned up and create a snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r853255991


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,23 @@ public synchronized void purgeInactiveProducers() {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
             String producerName = entry.getKey();
             long lastActiveTimestamp = entry.getValue();
 
-            mapIterator.remove();
-
             if (lastActiveTimestamp < minimumActiveTimestamp) {
                 log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
+                mapIterator.remove();
                 highestSequencedPushed.remove(producerName);
                 highestSequencedPersisted.remove(producerName);
+                hasInactive = true;
             }
         }
+        if (hasInactive) {
+            takeSnapshot(getManagedCursor().getMarkDeletedPosition());

Review Comment:
   Thanks your reply, the main reason I triggered a snapshot in the `purgeInactiveProducers` method was. If we rely on `deduplication-snapshot-monitor` timer to take snapshot, that may not be executed. Because we did the optimizations in the `takeSnapshot()` method.
   ```java
           PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
           if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) {
               return;
           }
   ```
   
   When `dup.cursor.markDeletedPosition == ledger.lastConfirmedEntry` and all producers stoped. In this case, we will not be able to clean up these producers. Unless new data is written and take a snapshot. The can be confusing for users.
   
   > but it traverses all topics each time when it is executed. If many topics have Producers that need to be purged, a large number of snapshots will be generated.
   
   In effect, automatically `takeSnapshot` also traverses all topics.
   
   > we do not need to generate a snapshot after each topic is purged. Or, wait until all topics are cleaned up and create a snapshot.
   
   This PR is only take a snapshot when there are inactive topics
   
   In conclusion, the snapshot here is to solve the problem described above, `checkMessageDeduplicationInfo` is not a frequent operation.
   
   Thanks again, If you have any suggestions, please continue the discussion.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r859646348


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java:
##########
@@ -187,26 +190,28 @@ public void testInactiveProducerRemove() throws Exception {
         when(publishContext.getProducerName()).thenReturn(producerName3);
         messageDeduplication.isDuplicate(publishContext, null);
 
+        // All 3 are added to the inactiveProducers list
         messageDeduplication.producerRemoved(producerName1);
-        assertTrue(map.containsKey(producerName1));
-        messageDeduplication.producerAdded(producerName1);
-        assertFalse(map.containsKey(producerName1));
+        messageDeduplication.producerRemoved(producerName2);
+        messageDeduplication.producerRemoved(producerName3);
+
+        // Try first purgeInactive, all producer not inactive.
         messageDeduplication.purgeInactiveProducers();
+        assertEquals(inactiveProducers.size(), 3);
+
+        // Modify the inactive time of produce2 and produce3
         // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
-        map.put(producerName2, System.currentTimeMillis() - 70000);
-        map.put(producerName3, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000);
+        // Try second purgeInactive, produce2 and produce3 is inactive.
         messageDeduplication.purgeInactiveProducers();
-        assertFalse(map.containsKey(producerName2));
-        assertFalse(map.containsKey(producerName3));

Review Comment:
   ping @eolivelli 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r852965699


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,23 @@ public synchronized void purgeInactiveProducers() {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
             String producerName = entry.getKey();
             long lastActiveTimestamp = entry.getValue();
 
-            mapIterator.remove();
-
             if (lastActiveTimestamp < minimumActiveTimestamp) {
                 log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
+                mapIterator.remove();
                 highestSequencedPushed.remove(producerName);
                 highestSequencedPersisted.remove(producerName);
+                hasInactive = true;
             }
         }
+        if (hasInactive) {
+            takeSnapshot(getManagedCursor().getMarkDeletedPosition());

Review Comment:
   `checkMessageDeduplicationInfo` is not a frequent operation, but it traverses all topics each time when it is executed. If many topics have Producers that need to be purged, a large number of snapshots will be generated.
   Currently, `takeSnapshot` is automatically executed every 120 seconds. Therefore, we do not need to generate a snapshot after each topic is purged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#issuecomment-1107839955

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r852516257


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,23 @@ public synchronized void purgeInactiveProducers() {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
             String producerName = entry.getKey();
             long lastActiveTimestamp = entry.getValue();
 
-            mapIterator.remove();
-
             if (lastActiveTimestamp < minimumActiveTimestamp) {
                 log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
+                mapIterator.remove();
                 highestSequencedPushed.remove(producerName);
                 highestSequencedPersisted.remove(producerName);
+                hasInactive = true;
             }
         }
+        if (hasInactive) {
+            takeSnapshot(getManagedCursor().getMarkDeletedPosition());

Review Comment:
   If there are a large number of topics, snapshots may be created concurrently. We should avoid creating snapshots frequently. This is now done by the timer thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#issuecomment-1107868131

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#issuecomment-1102229270

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r857146961


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java:
##########
@@ -187,26 +190,28 @@ public void testInactiveProducerRemove() throws Exception {
         when(publishContext.getProducerName()).thenReturn(producerName3);
         messageDeduplication.isDuplicate(publishContext, null);
 
+        // All 3 are added to the inactiveProducers list
         messageDeduplication.producerRemoved(producerName1);
-        assertTrue(map.containsKey(producerName1));
-        messageDeduplication.producerAdded(producerName1);
-        assertFalse(map.containsKey(producerName1));
+        messageDeduplication.producerRemoved(producerName2);
+        messageDeduplication.producerRemoved(producerName3);
+
+        // Try first purgeInactive, all producer not inactive.
         messageDeduplication.purgeInactiveProducers();
+        assertEquals(inactiveProducers.size(), 3);
+
+        // Modify the inactive time of produce2 and produce3
         // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
-        map.put(producerName2, System.currentTimeMillis() - 70000);
-        map.put(producerName3, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000);
+        // Try second purgeInactive, produce2 and produce3 is inactive.
         messageDeduplication.purgeInactiveProducers();
-        assertFalse(map.containsKey(producerName2));
-        assertFalse(map.containsKey(producerName3));

Review Comment:
   Why are you removing this assertion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#issuecomment-1107974284

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r857194236


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java:
##########
@@ -187,26 +190,28 @@ public void testInactiveProducerRemove() throws Exception {
         when(publishContext.getProducerName()).thenReturn(producerName3);
         messageDeduplication.isDuplicate(publishContext, null);
 
+        // All 3 are added to the inactiveProducers list
         messageDeduplication.producerRemoved(producerName1);
-        assertTrue(map.containsKey(producerName1));
-        messageDeduplication.producerAdded(producerName1);
-        assertFalse(map.containsKey(producerName1));
+        messageDeduplication.producerRemoved(producerName2);
+        messageDeduplication.producerRemoved(producerName3);
+
+        // Try first purgeInactive, all producer not inactive.
         messageDeduplication.purgeInactiveProducers();
+        assertEquals(inactiveProducers.size(), 3);
+
+        // Modify the inactive time of produce2 and produce3
         // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
-        map.put(producerName2, System.currentTimeMillis() - 70000);
-        map.put(producerName3, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000);
+        // Try second purgeInactive, produce2 and produce3 is inactive.
         messageDeduplication.purgeInactiveProducers();
-        assertFalse(map.containsKey(producerName2));
-        assertFalse(map.containsKey(producerName3));

Review Comment:
   Thank you for your review, this assertion can be kept, I reverted it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #15206:
URL: https://github.com/apache/pulsar/pull/15206


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r857194909


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java:
##########
@@ -187,26 +190,28 @@ public void testInactiveProducerRemove() throws Exception {
         when(publishContext.getProducerName()).thenReturn(producerName3);
         messageDeduplication.isDuplicate(publishContext, null);
 
+        // All 3 are added to the inactiveProducers list
         messageDeduplication.producerRemoved(producerName1);
-        assertTrue(map.containsKey(producerName1));
-        messageDeduplication.producerAdded(producerName1);
-        assertFalse(map.containsKey(producerName1));
+        messageDeduplication.producerRemoved(producerName2);
+        messageDeduplication.producerRemoved(producerName3);
+
+        // Try first purgeInactive, all producer not inactive.
         messageDeduplication.purgeInactiveProducers();
+        assertEquals(inactiveProducers.size(), 3);
+
+        // Modify the inactive time of produce2 and produce3
         // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
-        map.put(producerName2, System.currentTimeMillis() - 70000);
-        map.put(producerName3, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);
+        inactiveProducers.put(producerName3, System.currentTimeMillis() - 70000);
+        // Try second purgeInactive, produce2 and produce3 is inactive.
         messageDeduplication.purgeInactiveProducers();
-        assertFalse(map.containsKey(producerName2));
-        assertFalse(map.containsKey(producerName3));

Review Comment:
   @eolivelli Can you help me again take a look? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r852481522


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,22 @@ public synchronized void purgeInactiveProducers() {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;

Review Comment:
   I think I can do a more deep review later, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r852520591


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,23 @@ public synchronized void purgeInactiveProducers() {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
             String producerName = entry.getKey();
             long lastActiveTimestamp = entry.getValue();
 
-            mapIterator.remove();
-
             if (lastActiveTimestamp < minimumActiveTimestamp) {
                 log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
+                mapIterator.remove();
                 highestSequencedPushed.remove(producerName);
                 highestSequencedPersisted.remove(producerName);
+                hasInactive = true;
             }
         }
+        if (hasInactive) {
+            takeSnapshot(getManagedCursor().getMarkDeletedPosition());

Review Comment:
   I think it's a low-frequency behavior. Inactivity thread check the interval config is `brokerDeduplicationProducerInactivityTimeoutMinutes`, That default value is 6 hours. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r852478801


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,22 @@ public synchronized void purgeInactiveProducers() {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;

Review Comment:
   At glance, Looks like this value is always false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#issuecomment-1109412032

   @eolivelli Please help review this PR, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15206: Fix MessageDeduplication#inactiveProducers may not be persistence correctly

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r853255991


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,23 @@ public synchronized void purgeInactiveProducers() {
                 .toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
             String producerName = entry.getKey();
             long lastActiveTimestamp = entry.getValue();
 
-            mapIterator.remove();
-
             if (lastActiveTimestamp < minimumActiveTimestamp) {
                 log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
+                mapIterator.remove();
                 highestSequencedPushed.remove(producerName);
                 highestSequencedPersisted.remove(producerName);
+                hasInactive = true;
             }
         }
+        if (hasInactive) {
+            takeSnapshot(getManagedCursor().getMarkDeletedPosition());

Review Comment:
   Thanks your reply, the main reason I triggered a snapshot in the `purgeInactiveProducers` method was. If we rely on `deduplication-snapshot-monitor` timer to take snapshot, that may not be executed. Because we did the optimizations in the `takeSnapshot()` method.
   ```
           PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
           if (markDeletedPosition != null && position.compareTo(markDeletedPosition) <= 0) {
               return;
           }
   ```
   
   When `dup.cursor.markDeletedPosition == ledger.lastConfirmedEntry` and all producers stoped. In this case, we will not be able to clean up these producers. Unless new data is written and take a snapshot. The can be confusing for users.
   
   > but it traverses all topics each time when it is executed. If many topics have Producers that need to be purged, a large number of snapshots will be generated.
   
   In effect, automatically `takeSnapshot` also traverses all topics.
   
   > we do not need to generate a snapshot after each topic is purged. Or, wait until all topics are cleaned up and create a snapshot.
   
   This PR is only take a snapshot when there are inactive topics
   
   In conclusion, the snapshot here is to solve the problem described above, `checkMessageDeduplicationInfo` is not a frequent operation.
   
   Thanks again, If you have any suggestions, please continue the discussion.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org