You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "poorbarcode (via GitHub)" <gi...@apache.org> on 2023/10/23 16:20:52 UTC

[PR] [improve broker] Stop dispatch messages if the individual acks will b… [pulsar]

poorbarcode opened a new pull request, #21423:
URL: https://github.com/apache/pulsar/pull/21423

   ### Motivation & Modifications
   
   The part 1 of [PIP-299](https://github.com/apache/pulsar/pull/21118/files?short_path=cf766b5#diff-cf766b5d463b6832017e482baad14832f6a4d41dc969da279b98b69e26ec6f6a): the implementation of "Stop dispatch messages if the individual acks will be lost in the persistent storage"
   
   Subsequent PRs:
   - part-1: [add config dispatcherPauseOnAckStatePersistentEnabled](https://github.com/apache/pulsar/pull/21370)
   - part-3: Dynamic config 
   - part-4: Doc
   
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: x


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432914681


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
+    /**
+     * Delivery was paused at least once in the earlier time, due to the cursor data can not fully persist.
+     * Note: do not use this field to confirm whether the delivery should be paused,
+     *       please call {@link #shouldPauseOnAckStatePersist}.
+     */
+    private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;

Review Comment:
   @Technoboy- 
   
   > And also I think the pausedDueToCursorDataCanNotFullyPersist should be named something like BLOCKED_DISPATCHER_ON_CURSOR_DATA_NOT_FULLY_PERSIST
   because we have defined BLOCKED_DISPATCHER_ON_UNACKMSG
   
   Sure, renamed `pausedDueToCursorDataCanNotFullyPersist` to `blockedDispatcherOnCursorDataCanNotFullyPersist`
   
   @codelipenghui 
   
   > But they are different threads? Even if we have synchronized for the readMoreEntries() method. But will the afterAckMessages() method not acquire the same lock?
   
   This variable is just used to confirm whether it should trigger a new reading after any acknowledgment. So we only care about whether Pulsar will skip a new reading in mistake.
   
   Firstly, we only expected the new reading to be triggered after the dispatching has been paused at least once. There are two scenarios with race conditions:
   
   **Scenario-1**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `false` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, skip to trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and discard the reading task. |
   | 5. | No longer to trigger a new reading |
   
   For this scenario, adding a lock `synchronized` on the method `afterAckMessages` can solve the issue.
   
   **Scenario-2**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `true` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked half of all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == true`, trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and discard the reading task. |
   | 5. | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `false` |
   | 6. | Trigger a new reading |
   | 7. | Acked another half of all messages | 
   | 8. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, skip to trigger a new reading |
   
   For this scenario, `step 6` will trigger one of these two events: 
   - An exact reading, then trigger a new `ack`
   - Did not trigger an exact reading, so the variable `blockedDispatcherOnCursorDataCanNotFullyPersist` will be set to `false.`
   
   So, this race condition can be ignored.
   
   I added a lock `synchronized` on the method `afterAckMessages`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432431640


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java:
##########
@@ -129,6 +129,13 @@ default boolean checkAndUnblockIfStuck() {
         return false;
     }
 
+    /**
+     * A callback hook after acknowledge messages.
+     * If acknowledge successfully, {@param position} will not be null, and {@param position} and {@param ctx} will be
+     * null.
+     * If acknowledge failed. {@param position} will be null, and {@param position} and {@param ctx} will not be null.
+     */
+    default void afterAckMessages(Object position, Throwable error, Object ctx){}

Review Comment:
   > If it's inevitable. IMO, convert from the outside is better. From the API's perspective, we should have a more clear definition
   
   Sorry, after I've rearranged the context, I noticed that the first parameter(named `position`) shouldn't actually exist, the actual meaningful name is `ctx`(It just sets `ctx` with a `position` when calling the `deleteSync`).
   
   I changed the definition to `void afterAckMessages(Throwable error, Object ctx)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1437035119


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -996,6 +1055,23 @@ public void addUnAckedMessages(int numberOfMessages) {
         topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
     }
 
+    @Override
+    public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {
+        /**
+         * - Mark an acknowledgement were executed.
+         * - If there was no previous pause due to cursor data is too large to persist, we don't need to manually
+         *   trigger a new read. This can avoid too many CPU circles.
+         * - Clear the marker that represent delivery was paused at least once in the earlier time.
+         */
+        blockSignatureOnCursorDataCanNotFullyPersist.markNewAcknowledged();
+        if (blockSignatureOnCursorDataCanNotFullyPersist.hasPausedAtLeastOnce()
+                && cursor.isCursorDataFullyPersistable()) {
+            // clear paused count, and trigger a new reading.
+            blockSignatureOnCursorDataCanNotFullyPersist.clearMarkerPaused();
+            readMoreEntriesAsync();
+        }
+    }

Review Comment:
   I tried another approach to simplify the implementation.
   
   AtomicBoolean pausedByCursorDataPersistence;
   
   ```java
   public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {
           if (pausedByCursorDataPersistence.get()) {
   	    if (cursor.isCursorDataFullyPersistable()) {
   	    	if (pausedByCursorDataPersistence.compareAndSet(true, false)) {
   	    		readMoreEntriesAsync();
   	    	} else {
   	    	    afterAckMessages(exOfDeletion, ctxOfDeletion)
   	    	}
   	    }
   	} else {
   		if (!cursor.isCursorDataFullyPersistable()) {
   		    if (pausedByCursorDataPersistence.compareAndSet(false, true)) {
   
   		    } else {
   		    	afterAckMessages(exOfDeletion, ctxOfDeletion)
   		    }
   		}
   	}
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435919246


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -113,6 +113,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "totalUnackedMessages");
     protected volatile int totalUnackedMessages = 0;
+    /**
+     * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+     * Note: do not use this field to confirm whether the delivery should be paused,

Review Comment:
   At the following line: already recommended `please call {@link #shouldPauseOnAckStatePersist}`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432494860


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
+    /**
+     * Delivery was paused at least once in the earlier time, due to the cursor data can not fully persist.
+     * Note: do not use this field to confirm whether the delivery should be paused,
+     *       please call {@link #shouldPauseOnAckStatePersist}.
+     */
+    protected volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;

Review Comment:
   ```suggestion
       private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432354290


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -366,6 +377,30 @@ protected void readMoreEntries(Consumer consumer) {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist() {
+        if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+            return false;
+        }
+        if (cursor == null) {
+            return true;
+        }
+        if (!cursor.isMetadataTooLargeToPersist()) {
+            return false;
+        }
+        // The cursor state is too large to persist, let us check whether the read is a replay read.
+        Range<PositionImpl> lastIndividualDeletedRange = cursor.getLastIndividualDeletedRange();
+        if (lastIndividualDeletedRange == null) {
+            // lastIndividualDeletedRange is null means the read is not replay read.
+            return true;
+        }
+        // If read position is less than the last acked position, it means the read is a replay read.
+        PositionImpl lastAckedPosition = lastIndividualDeletedRange.upperEndpoint();
+        Position readPosition = cursor.getReadPosition();
+        boolean readPositionIsSmall =
+                lastAckedPosition.compareTo(readPosition.getLedgerId(), readPosition.getEntryId()) > 0;
+        return !readPositionIsSmall;
+    }
+

Review Comment:
   Sure, reverted the change of `PersistentDispatcherSingleActiveConsumer`. And added a test for this scenario: `testSingleConsumerDispatcherWillNotPause`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1431169257


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -995,6 +1023,13 @@ public void addUnAckedMessages(int numberOfMessages) {
         topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
     }
 
+    @Override
+    public void afterAckMessages(Object position, Throwable error, Object ctx) {
+        if (!cursor.isMetadataTooLargeToPersist()) {

Review Comment:
   Use `shouldPauseOnAckStatePersist` directly?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -354,6 +369,19 @@ public synchronized void readMoreEntries() {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+        if (readType != ReadType.Normal) {
+            return false;
+        }

Review Comment:
   Why don't you pause replay read? acknowledge replay message will also increase size of individualDeletedMessages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432914681


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
+    /**
+     * Delivery was paused at least once in the earlier time, due to the cursor data can not fully persist.
+     * Note: do not use this field to confirm whether the delivery should be paused,
+     *       please call {@link #shouldPauseOnAckStatePersist}.
+     */
+    private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;

Review Comment:
   @Technoboy- 
   
   > And also I think the pausedDueToCursorDataCanNotFullyPersist should be named something like BLOCKED_DISPATCHER_ON_CURSOR_DATA_NOT_FULLY_PERSIST
   because we have defined BLOCKED_DISPATCHER_ON_UNACKMSG
   
   Sure, renamed `pausedDueToCursorDataCanNotFullyPersist` to `blockedDispatcherOnCursorDataCanNotFullyPersist`
   
   @codelipenghui 
   
   > But they are different threads? Even if we have synchronized for the readMoreEntries() method. But will the afterAckMessages() method not acquire the same lock?
   
   This variable is just used to confirm whether it should trigger a new reading after any acknowledgment. So we only care about whether Pulsar will skip a new reading in mistake.
   
   Firstly, we only expected the new reading to be triggered after the dispatching has been paused at least once. There are two scenarios with race conditions:
   
   **Scenario-1**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `false` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, skip to trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and discard the reading task. |
   
   **Scenario-2**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `true` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked half of all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == true`, trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and discard the reading task. |
   | 5. | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `false` |
   | 6. | Acked another half of all messages | 
   | 7. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, skip to trigger a new reading |



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435834659


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){}
      * @return whether this cursor is closed.
      */
     boolean isClosed();
+
+    default boolean isCursorDataFullyPersistable() {
+        return true;

Review Comment:
   `isCursorDataFullyPersistable` means the cursor data is little, so it can be fully persistent. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432583611


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
+    /**
+     * Delivery was paused at least once in the earlier time, due to the cursor data can not fully persist.
+     * Note: do not use this field to confirm whether the delivery should be paused,
+     *       please call {@link #shouldPauseOnAckStatePersist}.
+     */
+    private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;

Review Comment:
   We have two parts will update this field
   
   1. readMoreEntries()
   2. afterAckMessages()
   
   But they are different threads? even if we have `synchronized` for the `readMoreEntries()` method. But the `afterAckMessages()` method will not acquire the same lock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432914681


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
+    /**
+     * Delivery was paused at least once in the earlier time, due to the cursor data can not fully persist.
+     * Note: do not use this field to confirm whether the delivery should be paused,
+     *       please call {@link #shouldPauseOnAckStatePersist}.
+     */
+    private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;

Review Comment:
   @Technoboy- 
   
   > And also I think the pausedDueToCursorDataCanNotFullyPersist should be named something like BLOCKED_DISPATCHER_ON_CURSOR_DATA_NOT_FULLY_PERSIST
   because we have defined BLOCKED_DISPATCHER_ON_UNACKMSG
   
   Sure, renamed `pausedDueToCursorDataCanNotFullyPersist` to `blockedDispatcherOnCursorDataCanNotFullyPersist`
   
   @codelipenghui 
   
   > But they are different threads? Even if we have synchronized for the readMoreEntries() method. But will the afterAckMessages() method not acquire the same lock?
   
   This variable is just used to confirm whether it should trigger a new reading after any acknowledgment. So we only care about whether Pulsar will skip a new reading in mistake.
   
   Firstly, we only expected the new reading to be triggered after the dispatching has been paused at least once. There are two scenarios with race conditions:
   
   **Scenario-1**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `false` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, skip to trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and discard the reading task. |
   | 5. | No longer to trigger a new reading |
   
   For this scenario, adding a lock `synchronized` on the method `afterAckMessages` can solve the issue.
   
   **Scenario-2**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `true` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked half of all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == true`, trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and discard the reading task. |
   | 5. | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `false` |
   | 6. | Trigger a new reading |
   | 7. | Acked another half of all messages | 
   | 8. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, skip to trigger a new reading |
   
   For this scenario, `step 6` will trigger one of these two events: 
   - An exact reading, then trigger a new `ack`
   - Did not trigger an exact reading, so the variable `blockedDispatcherOnCursorDataCanNotFullyPersist` will be set to `false.`
   
   So, this race condition can be ignored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435950180


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -113,6 +113,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "totalUnackedMessages");
     protected volatile int totalUnackedMessages = 0;
+    /**
+     * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+     * Note: do not use this field to confirm whether the delivery should be paused,

Review Comment:
   > A tool to help the dispatcher check whether the delivery should be paused.
   
   No, it is not. It is a tool that helps determine whether it should trigger a new reading after acknowledgments. Do not use this to confirm whether the delivery should be paused, please call {@link #shouldPauseOnAckStatePersist}. 
   
   Updated this description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve broker] Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode closed pull request #21423: [improve broker] Stop dispatch messages if the individual acks will be lost in the persistent storage
URL: https://github.com/apache/pulsar/pull/21423


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "liangyepianzhou (via GitHub)" <gi...@apache.org>.
liangyepianzhou commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435909754


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -113,6 +113,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "totalUnackedMessages");
     protected volatile int totalUnackedMessages = 0;
+    /**
+     * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+     * Note: do not use this field to confirm whether the delivery should be paused,

Review Comment:
   I know this. But this notes seems to be strange.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435836234


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/BlockDispatcherSignatureOnCursorDataCanNotFullyPersist.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+/***
+ * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+ * Note: do not use this field to confirm whether the delivery should be paused,
+ *      please call {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist}.
+ */
+public class BlockDispatcherSignatureOnCursorDataCanNotFullyPersist {
+
+    /**
+     * Used to mark that dispatching was paused at least once in the earlier time, due to the cursor data can not be
+     * fully persistent.
+     * Why need this filed? It just prevents that
+     * {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} calls
+     * {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} every time, it can avoid too many CPU circles.
+     * We just call {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} after the dispatching has been
+     * paused at least once earlier.
+     */
+    private volatile boolean markerPausedAtLeastOnce;
+
+    /**
+     * Used to mark some acknowledgements were executed.
+     * Because there is a race condition might cause dispatching stuck, the steps to reproduce the issue is like below:
+     * - {@link #markerPausedAtLeastOnce} is "false" now.
+     * - Thread-reading-entries: there are too many ack holes, so start to pause dispatching
+     * - Thread-ack: acked all messages.

Review Comment:
   Once this line `afterAckMessages` is executed, it means the acknowledgment task has been finished, right? Then the check `cursor.isCursorDataFullyPersistable()` of both operation `afterAckMessages` and `shouldPauseOnAckStatePersist` will get a same result, right? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/BlockDispatcherSignatureOnCursorDataCanNotFullyPersist.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+/***
+ * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+ * Note: do not use this field to confirm whether the delivery should be paused,
+ *      please call {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist}.
+ */
+public class BlockDispatcherSignatureOnCursorDataCanNotFullyPersist {
+
+    /**
+     * Used to mark that dispatching was paused at least once in the earlier time, due to the cursor data can not be
+     * fully persistent.
+     * Why need this filed? It just prevents that
+     * {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} calls
+     * {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} every time, it can avoid too many CPU circles.
+     * We just call {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} after the dispatching has been
+     * paused at least once earlier.
+     */
+    private volatile boolean markerPausedAtLeastOnce;
+
+    /**
+     * Used to mark some acknowledgements were executed.
+     * Because there is a race condition might cause dispatching stuck, the steps to reproduce the issue is like below:
+     * - {@link #markerPausedAtLeastOnce} is "false" now.
+     * - Thread-reading-entries: there are too many ack holes, so start to pause dispatching
+     * - Thread-ack: acked all messages.

Review Comment:
   Once this line `afterAckMessages` is executed, it means the acknowledgment task has been finished, right? Then the check `cursor.isCursorDataFullyPersistable()` of both operation `afterAckMessages` and `shouldPauseOnAckStatePersist` will get the same result, right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435835369


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -113,6 +113,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "totalUnackedMessages");
     protected volatile int totalUnackedMessages = 0;
+    /**
+     * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+     * Note: do not use this field to confirm whether the delivery should be paused,

Review Comment:
   If you use a state to indicate whether the delivery should be paused, you should update it in time, It is extremely difficult to implement "update state in time" without locking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432940653


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -359,6 +382,20 @@ public synchronized void readMoreEntries() {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+        // Allows new consumers to consume redelivered messages caused by the just-closed consumer.
+        if (readType != ReadType.Normal) {
+            return false;
+        }
+        if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+            return false;

Review Comment:
   Thanks for mentioning me, I added a task named "part-4: Topic policies support" in the `Motivation & Modifications`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -456,17 +456,20 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
 
     private final DeleteCallback deleteCallback = new DeleteCallback() {
         @Override
-        public void deleteComplete(Object position) {
+        public void deleteComplete(Object context) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Deleted message at {}", topicName, subName, position);
+                // The value of the param "context" is a position.
+                log.debug("[{}][{}] Deleted message at {}", topicName, subName, context);
             }
             // Signal the dispatchers to give chance to take extra actions
-            notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) position);
+            dispatcher.afterAckMessages(null, context);

Review Comment:
   Fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432340799


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){}
      * @return whether this cursor is closed.
      */
     boolean isClosed();
+
+    default boolean isMetadataTooLargeToPersist() {

Review Comment:
   Good suggestion. Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432354290


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -366,6 +377,30 @@ protected void readMoreEntries(Consumer consumer) {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist() {
+        if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+            return false;
+        }
+        if (cursor == null) {
+            return true;
+        }
+        if (!cursor.isMetadataTooLargeToPersist()) {
+            return false;
+        }
+        // The cursor state is too large to persist, let us check whether the read is a replay read.
+        Range<PositionImpl> lastIndividualDeletedRange = cursor.getLastIndividualDeletedRange();
+        if (lastIndividualDeletedRange == null) {
+            // lastIndividualDeletedRange is null means the read is not replay read.
+            return true;
+        }
+        // If read position is less than the last acked position, it means the read is a replay read.
+        PositionImpl lastAckedPosition = lastIndividualDeletedRange.upperEndpoint();
+        Position readPosition = cursor.getReadPosition();
+        boolean readPositionIsSmall =
+                lastAckedPosition.compareTo(readPosition.getLedgerId(), readPosition.getEntryId()) > 0;
+        return !readPositionIsSmall;
+    }
+

Review Comment:
   Sure, reverted the change of `PersistentDispatcherSingleActiveConsumer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435836411


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -359,6 +382,41 @@ public synchronized void readMoreEntries() {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+        // Allows new consumers to consume redelivered messages caused by the just-closed consumer.
+        if (readType != ReadType.Normal) {
+            return false;
+        }
+        if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+            return false;
+        }
+        if (cursor == null) {
+            return true;
+        }
+
+        /**
+         * Double check for "Cursor data can be fully persistent".
+         * - Clear the marker that represent some acknowledgements were executed.
+         * - Check whether dispatching should be paused due to cursor data is too large to persistent.
+         * - If dispatching should be paused, but some acknowledgements have been executed, re-calculate the result.
+         * - Mark delivery was paused at least once.
+         */
+        // Clear the marker that represent some acknowledgements were executed.
+        blockSignatureOnCursorDataCanNotFullyPersist.clearMakerNewAcknowledged();

Review Comment:
   I could not fully understand this comment. Could you explain in more detail?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435919797


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/BlockDispatcherSignatureOnCursorDataCanNotFullyPersist.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+/***
+ * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+ * Note: do not use this field to confirm whether the delivery should be paused,
+ *      please call {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist}.
+ */
+public class BlockDispatcherSignatureOnCursorDataCanNotFullyPersist {
+
+    /**
+     * Used to mark that dispatching was paused at least once in the earlier time, due to the cursor data can not be
+     * fully persistent.
+     * Why need this filed? It just prevents that
+     * {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} calls
+     * {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} every time, it can avoid too many CPU circles.
+     * We just call {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} after the dispatching has been
+     * paused at least once earlier.
+     */
+    private volatile boolean markerPausedAtLeastOnce;
+
+    /**
+     * Used to mark some acknowledgements were executed.
+     * Because there is a race condition might cause dispatching stuck, the steps to reproduce the issue is like below:
+     * - {@link #markerPausedAtLeastOnce} is "false" now.
+     * - Thread-reading-entries: there are too many ack holes, so start to pause dispatching
+     * - Thread-ack: acked all messages.

Review Comment:
   - At first, `afterAckMessages` is called after `acknowledgment`
   - `cursor.isCursorDataFullyPersistable()` in `shouldPauseOnAckStatePersist` executed before the acknowledgment.
   
   Do you mean there is more than one `acknowledgment`? The second `afterAckMessages` will change the marker `newAcknowledged` to `true`, right?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1433229187


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -996,6 +1033,17 @@ public void addUnAckedMessages(int numberOfMessages) {
         topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
     }
 
+    @Override
+    public synchronized void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {

Review Comment:
   This is a hot path in Pulsar. Is it better to use compareAndSet() with AtomicBoolean to avoid any performance degradation?
   
   And it's better to remove `volatile` since you already have `synchronized` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432914681


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
+    /**
+     * Delivery was paused at least once in the earlier time, due to the cursor data can not fully persist.
+     * Note: do not use this field to confirm whether the delivery should be paused,
+     *       please call {@link #shouldPauseOnAckStatePersist}.
+     */
+    private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;

Review Comment:
   @Technoboy- 
   
   > And also I think the pausedDueToCursorDataCanNotFullyPersist should be named something like BLOCKED_DISPATCHER_ON_CURSOR_DATA_NOT_FULLY_PERSIST
   because we have defined BLOCKED_DISPATCHER_ON_UNACKMSG
   
   Sure, renamed `pausedDueToCursorDataCanNotFullyPersist` to `blockedDispatcherOnCursorDataCanNotFullyPersist`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1433991562


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/BlockDispatcherSignatureOnCursorDataCanNotFullyPersist.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+/***
+ * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+ * Note: do not use this field to confirm whether the delivery should be paused,
+ *      please call {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist}.
+ */
+public class BlockDispatcherSignatureOnCursorDataCanNotFullyPersist {
+
+    /**
+     * Used to mark that dispatching was paused at least once in the earlier time, due to the cursor data can not be
+     * fully persistent.
+     * Why need this filed? It just prevents that
+     * {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} calls
+     * {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} every time, it can avoid too many CPU circles.
+     * We just call {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} after the dispatching has been
+     * paused at least once earlier.
+     */
+    private volatile boolean markerAtLeastPausedOnce;

Review Comment:
   markerPausedAtLeastOnce



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1433473238


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -996,6 +1033,17 @@ public void addUnAckedMessages(int numberOfMessages) {
         topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
     }
 
+    @Override
+    public synchronized void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {

Review Comment:
   yes,agree to use `AtomicIntegerFieldUpdater ` to update `blockedDispatcherOnCursorDataCanNotFullyPersist`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432646120


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
+    /**
+     * Delivery was paused at least once in the earlier time, due to the cursor data can not fully persist.
+     * Note: do not use this field to confirm whether the delivery should be paused,
+     *       please call {@link #shouldPauseOnAckStatePersist}.
+     */
+    private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;

Review Comment:
   And also I think the `pausedDueToCursorDataCanNotFullyPersist`  should be named something like `BLOCKED_DISPATCHER_ON_CURSOR_DATA_NOT_FULLY_PERSIST`
   because we have defined `BLOCKED_DISPATCHER_ON_UNACKMSG`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "liangyepianzhou (via GitHub)" <gi...@apache.org>.
liangyepianzhou commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435943914


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/BlockDispatcherSignatureOnCursorDataCanNotFullyPersist.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+/***
+ * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+ * Note: do not use this field to confirm whether the delivery should be paused,
+ *      please call {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist}.
+ */
+public class BlockDispatcherSignatureOnCursorDataCanNotFullyPersist {
+
+    /**
+     * Used to mark that dispatching was paused at least once in the earlier time, due to the cursor data can not be
+     * fully persistent.
+     * Why need this filed? It just prevents that
+     * {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} calls
+     * {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} every time, it can avoid too many CPU circles.
+     * We just call {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} after the dispatching has been
+     * paused at least once earlier.
+     */
+    private volatile boolean markerPausedAtLeastOnce;
+
+    /**
+     * Used to mark some acknowledgements were executed.
+     * Because there is a race condition might cause dispatching stuck, the steps to reproduce the issue is like below:
+     * - {@link #markerPausedAtLeastOnce} is "false" now.
+     * - Thread-reading-entries: there are too many ack holes, so start to pause dispatching
+     * - Thread-ack: acked all messages.

Review Comment:
   Oh, I got it.
   The `clearMakerNewAcknowledged ` is executed before `cursor .isCursorDataFullyPersistable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432914681


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
+    /**
+     * Delivery was paused at least once in the earlier time, due to the cursor data can not fully persist.
+     * Note: do not use this field to confirm whether the delivery should be paused,
+     *       please call {@link #shouldPauseOnAckStatePersist}.
+     */
+    private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;

Review Comment:
   @Technoboy- 
   
   > And also I think the pausedDueToCursorDataCanNotFullyPersist should be named something like BLOCKED_DISPATCHER_ON_CURSOR_DATA_NOT_FULLY_PERSIST
   because we have defined BLOCKED_DISPATCHER_ON_UNACKMSG
   
   Sure, renamed `pausedDueToCursorDataCanNotFullyPersist` to `blockedDispatcherOnCursorDataCanNotFullyPersist`
   
   @codelipenghui 
   
   > But they are different threads? Even if we have synchronized for the readMoreEntries() method. But will the afterAckMessages() method not acquire the same lock?
   
   This variable is just used to confirm whether it should trigger a new reading after any acknowledgment. So we only care about whether Pulsar will skip a new reading in mistake.
   
   Firstly, we only expected the new reading to be triggered after the dispatching has been paused at least once. There are two scenarios with race conditions:
   
   **Scenario-1**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `false` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, skip to trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and discard the reading task. |
   | 5. | No longer to trigger a new reading |
   
   For this scenario, adding a lock `synchronized` on the method `afterAckMessages` can solve the issue.
   
   **Scenario-2**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `true` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked half of all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == true`, trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and discard the reading task. |
   | 5. | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `false` |
   | 6. | Trigger a new reading |
   | 7. | Acked another half of all messages | 
   | 8. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, skip to trigger a new reading |
   
   For this scenario, `step 6` will trigger one of these two events: 
   - An exact reading, then trigger a new `ack`
   - Did not trigger an exact reading, so the variable `blockedDispatcherOnCursorDataCanNotFullyPersist` will be set to `false.`
   
   So, this race condition can be ignored.
   
   ---
   
   ### Summary
   I added a lock `synchronized` on the method `afterAckMessages`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432423593


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -995,6 +1023,13 @@ public void addUnAckedMessages(int numberOfMessages) {
         topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
     }
 
+    @Override
+    public void afterAckMessages(Object position, Throwable error, Object ctx) {
+        if (!cursor.isMetadataTooLargeToPersist()) {

Review Comment:
   @coderzc 
   
   > Use shouldPauseOnAckStatePersist directly?
   
   If we call `shouldPauseOnAckStatePersist(ReadType.Normal)` here, the param `ReadType.Normal` is confuse.
   
   
   @codelipenghui 
   
   > Will it waste too many CPU circles if cursor.isMetadataTooLargeToPersist() is always false? This method readMoreEntriesAsync will be executed by another thread and it will do lot of checks for each call.
   > I think what we want to do is if the dispatcher is paused by the ack state persistence before, but now the ack state persistence is good. We should resume the message dispatching.
   
   Good suggestion. I added a counter indicates how many times the dispatching has been paused, and call `readMoreEntries` only the counter is larger than `0`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432348749


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -354,6 +369,19 @@ public synchronized void readMoreEntries() {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+        if (readType != ReadType.Normal) {
+            return false;
+        }

Review Comment:
   > @coderzc Why don't you pause replay read? acknowledge replay message will also increase size of individualDeletedMessages.
   > @codelipenghui I think the reason should be consumers can disconnect from the topic if we stop dispatching messages to the active consumer. Somehow, the dispatching will stop forever.
   
   Yes, @codelipenghui said is right.
   
   > could you please add some comments here? 
   
   added.
   
   > we should also add test for the consumer disconnection to make sure the remaining active consumers can ultimately consume all messages
   
   Sure, they are `testPauseOnAckStatPersistNotAffectReplayRead` and `testMultiConsumersPauseOnAckStatPersistNotAffectReplayRead`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432073401


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){}
      * @return whether this cursor is closed.
      */
     boolean isClosed();
+
+    default boolean isMetadataTooLargeToPersist() {

Review Comment:
   I would like suggest to change to `isCursorDataFullyPersistable()`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -366,6 +377,30 @@ protected void readMoreEntries(Consumer consumer) {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist() {
+        if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+            return false;
+        }
+        if (cursor == null) {
+            return true;
+        }
+        if (!cursor.isMetadataTooLargeToPersist()) {
+            return false;
+        }
+        // The cursor state is too large to persist, let us check whether the read is a replay read.
+        Range<PositionImpl> lastIndividualDeletedRange = cursor.getLastIndividualDeletedRange();
+        if (lastIndividualDeletedRange == null) {
+            // lastIndividualDeletedRange is null means the read is not replay read.
+            return true;
+        }
+        // If read position is less than the last acked position, it means the read is a replay read.
+        PositionImpl lastAckedPosition = lastIndividualDeletedRange.upperEndpoint();
+        Position readPosition = cursor.getReadPosition();
+        boolean readPositionIsSmall =
+                lastAckedPosition.compareTo(readPosition.getLedgerId(), readPosition.getEntryId()) > 0;
+        return !readPositionIsSmall;
+    }
+

Review Comment:
   Hmmm. For the single active consumer subscription. Practically, I think we don't need to consider the ack state persistence because it should be a misuse. Maybe we can skip the single active consumer subscription right now. And add it later if we really need it, based on the actual needs.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java:
##########
@@ -129,6 +129,13 @@ default boolean checkAndUnblockIfStuck() {
         return false;
     }
 
+    /**
+     * A callback hook after acknowledge messages.
+     * If acknowledge successfully, {@param position} will not be null, and {@param position} and {@param ctx} will be
+     * null.
+     * If acknowledge failed. {@param position} will be null, and {@param position} and {@param ctx} will not be null.
+     */
+    default void afterAckMessages(Object position, Throwable error, Object ctx){}

Review Comment:
   If it's inevitable. IMO, convert from the outside is better. From the API's perspective, we should have a more clear definition



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -354,6 +369,19 @@ public synchronized void readMoreEntries() {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+        if (readType != ReadType.Normal) {
+            return false;
+        }

Review Comment:
   I think the reason should be consumers can disconnect from the topic if we stop dispatching messages to the active consumer. Somehow, the dispatching will stop forever.
   
   @poorbarcode could you please add some comments here? And we should also add test for the consumer disconnection to make sure the remaining active consumers can ultimately consume all messages.
   
   Could you please add some comments here and we should also add test for the consumer disconnection to make sure the remaining active consumers can ultimately consume all messages



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java:
##########
@@ -129,6 +129,13 @@ default boolean checkAndUnblockIfStuck() {
         return false;
     }
 
+    /**
+     * A callback hook after acknowledge messages.
+     * If acknowledge successfully, {@param position} will not be null, and {@param position} and {@param ctx} will be
+     * null.
+     * If acknowledge failed. {@param position} will be null, and {@param position} and {@param ctx} will not be null.
+     */
+    default void afterAckMessages(Object position, Throwable error, Object ctx){}

Review Comment:
   Why the type of position should be defined as Object?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -995,6 +1023,13 @@ public void addUnAckedMessages(int numberOfMessages) {
         topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
     }
 
+    @Override
+    public void afterAckMessages(Object position, Throwable error, Object ctx) {
+        if (!cursor.isMetadataTooLargeToPersist()) {

Review Comment:
   Will it waste too many CPU circles if `cursor.isMetadataTooLargeToPersist()` is always false? This method `readMoreEntriesAsync` will be executed by another thread and it will do lot of checks for each call.
   
   I think what we want to do is if the dispatcher is paused by the ack state persistence before, but now the ack state persistence is good. We should resume the message dispatching. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432914681


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
+    /**
+     * Delivery was paused at least once in the earlier time, due to the cursor data can not fully persist.
+     * Note: do not use this field to confirm whether the delivery should be paused,
+     *       please call {@link #shouldPauseOnAckStatePersist}.
+     */
+    private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;

Review Comment:
   @Technoboy- 
   
   > And also I think the pausedDueToCursorDataCanNotFullyPersist should be named something like BLOCKED_DISPATCHER_ON_CURSOR_DATA_NOT_FULLY_PERSIST
   because we have defined BLOCKED_DISPATCHER_ON_UNACKMSG
   
   Sure, renamed `pausedDueToCursorDataCanNotFullyPersist` to `blockedDispatcherOnCursorDataCanNotFullyPersist`
   
   @codelipenghui 
   
   > But they are different threads? Even if we have synchronized for the readMoreEntries() method. But will the afterAckMessages() method not acquire the same lock?
   
   This variable is just used to confirm whether it should trigger a new reading after any acknowledgment. So we only care about whether Pulsar will skip a new reading in mistake.
   
   Firstly, we only expected the new reading to be triggered after the dispatching has been paused at least once. There are two scenarios with race conditions:
   
   **Scenario-1**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `false` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, skip to trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and discard the reading task. |
   
   **Scenario-2**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `true` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked half of all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == true`, trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and discard the reading task. |
   | 5. | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `false` |
   | 6. | Trigger a new reading |
   | 7. | Acked another half of all messages | 
   | 8. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, skip to trigger a new reading |



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432636470


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -456,17 +456,20 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
 
     private final DeleteCallback deleteCallback = new DeleteCallback() {
         @Override
-        public void deleteComplete(Object position) {
+        public void deleteComplete(Object context) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Deleted message at {}", topicName, subName, position);
+                // The value of the param "context" is a position.
+                log.debug("[{}][{}] Deleted message at {}", topicName, subName, context);
             }
             // Signal the dispatchers to give chance to take extra actions
-            notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) position);
+            dispatcher.afterAckMessages(null, context);

Review Comment:
   Should check if the dispatcher is null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "liangyepianzhou (via GitHub)" <gi...@apache.org>.
liangyepianzhou commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435829047


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -359,6 +382,41 @@ public synchronized void readMoreEntries() {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+        // Allows new consumers to consume redelivered messages caused by the just-closed consumer.
+        if (readType != ReadType.Normal) {
+            return false;
+        }
+        if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+            return false;
+        }
+        if (cursor == null) {
+            return true;
+        }
+
+        /**
+         * Double check for "Cursor data can be fully persistent".
+         * - Clear the marker that represent some acknowledgements were executed.
+         * - Check whether dispatching should be paused due to cursor data is too large to persistent.
+         * - If dispatching should be paused, but some acknowledgements have been executed, re-calculate the result.
+         * - Mark delivery was paused at least once.
+         */
+        // Clear the marker that represent some acknowledgements were executed.
+        blockSignatureOnCursorDataCanNotFullyPersist.clearMakerNewAcknowledged();

Review Comment:
   Maybe you should move `blockSignatureOnCursorDataCanNotFullyPersist.clearMakerNewAcknowledged();` after `blockSignatureOnCursorDataCanNotFullyPersist.markPaused();`. That will trigger `readMoreEntriesAsync` more one time, but will not lose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "liangyepianzhou (via GitHub)" <gi...@apache.org>.
liangyepianzhou commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435946350


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -359,6 +382,41 @@ public synchronized void readMoreEntries() {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+        // Allows new consumers to consume redelivered messages caused by the just-closed consumer.
+        if (readType != ReadType.Normal) {
+            return false;
+        }
+        if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+            return false;
+        }
+        if (cursor == null) {
+            return true;
+        }
+
+        /**
+         * Double check for "Cursor data can be fully persistent".
+         * - Clear the marker that represent some acknowledgements were executed.
+         * - Check whether dispatching should be paused due to cursor data is too large to persistent.
+         * - If dispatching should be paused, but some acknowledgements have been executed, re-calculate the result.
+         * - Mark delivery was paused at least once.
+         */
+        // Clear the marker that represent some acknowledgements were executed.
+        blockSignatureOnCursorDataCanNotFullyPersist.clearMakerNewAcknowledged();

Review Comment:
   This is following the [comment](https://github.com/apache/pulsar/pull/21423/files/560944a7409df2cce497e8432da2b95b61d25325#diff-89265294ca675086422dde3f59e67ef298e7b7eea4d1dc8d28de7b9d60f92c0b). The original approach is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "liangyepianzhou (via GitHub)" <gi...@apache.org>.
liangyepianzhou commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435792739


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){}
      * @return whether this cursor is closed.
      */
     boolean isClosed();
+
+    default boolean isCursorDataFullyPersistable() {
+        return true;

Review Comment:
   Why the default value is `true`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/BlockDispatcherSignatureOnCursorDataCanNotFullyPersist.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+/***
+ * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+ * Note: do not use this field to confirm whether the delivery should be paused,
+ *      please call {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist}.
+ */
+public class BlockDispatcherSignatureOnCursorDataCanNotFullyPersist {
+
+    /**
+     * Used to mark that dispatching was paused at least once in the earlier time, due to the cursor data can not be
+     * fully persistent.
+     * Why need this filed? It just prevents that
+     * {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} calls
+     * {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} every time, it can avoid too many CPU circles.
+     * We just call {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} after the dispatching has been
+     * paused at least once earlier.
+     */
+    private volatile boolean markerPausedAtLeastOnce;
+
+    /**
+     * Used to mark some acknowledgements were executed.
+     * Because there is a race condition might cause dispatching stuck, the steps to reproduce the issue is like below:
+     * - {@link #markerPausedAtLeastOnce} is "false" now.
+     * - Thread-reading-entries: there are too many ack holes, so start to pause dispatching
+     * - Thread-ack: acked all messages.

Review Comment:
   This does not seems to have any effect.
   1. `afterAckMessages`
   ```java
   blockSignatureOnCursorDataCanNotFullyPersist.markNewAcknowledged();
   ```
   2. `shouldPauseOnAckStatePersist: `
   ```java
   blockSignatureOnCursorDataCanNotFullyPersist.clearMakerNewAcknowledged();
   ```
   3. `afterAckMessages`
   ```java
           if (blockSignatureOnCursorDataCanNotFullyPersist.hasPausedAtLeastOnce()
                   && cursor.isCursorDataFullyPersistable()) {
               // clear paused count, and trigger a new reading.
               blockSignatureOnCursorDataCanNotFullyPersist.clearMarkerPaused();
               readMoreEntriesAsync();
           }
   ```
   4. `shouldPauseOnAckStatePersist`
   ```
    blockSignatureOnCursorDataCanNotFullyPersist.markPaused();
   ```
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -113,6 +113,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "totalUnackedMessages");
     protected volatile int totalUnackedMessages = 0;
+    /**
+     * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+     * Note: do not use this field to confirm whether the delivery should be paused,

Review Comment:
   Why not? Could you please add an explanation here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1433819191


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -996,6 +1033,17 @@ public void addUnAckedMessages(int numberOfMessages) {
         topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
     }
 
+    @Override
+    public synchronized void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {

Review Comment:
   I created a stat class `BlockDispatcherSignatureOnCursorDataCanNotFullyPersist` to avoid use `synchronized `, please take a look again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1434004975


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/BlockDispatcherSignatureOnCursorDataCanNotFullyPersist.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+/***
+ * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+ * Note: do not use this field to confirm whether the delivery should be paused,
+ *      please call {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist}.
+ */
+public class BlockDispatcherSignatureOnCursorDataCanNotFullyPersist {
+
+    /**
+     * Used to mark that dispatching was paused at least once in the earlier time, due to the cursor data can not be
+     * fully persistent.
+     * Why need this filed? It just prevents that
+     * {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} calls
+     * {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} every time, it can avoid too many CPU circles.
+     * We just call {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} after the dispatching has been
+     * paused at least once earlier.
+     */
+    private volatile boolean markerAtLeastPausedOnce;

Review Comment:
   Renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432651352


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -359,6 +382,20 @@ public synchronized void readMoreEntries() {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+        // Allows new consumers to consume redelivered messages caused by the just-closed consumer.
+        if (readType != ReadType.Normal) {
+            return false;
+        }
+        if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+            return false;

Review Comment:
   We get this value from the topic policies, where do we update this value ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "liangyepianzhou (via GitHub)" <gi...@apache.org>.
liangyepianzhou commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435942994


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -113,6 +113,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "totalUnackedMessages");
     protected volatile int totalUnackedMessages = 0;
+    /**
+     * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+     * Note: do not use this field to confirm whether the delivery should be paused,

Review Comment:
   Maybe the following note is better?
   ```
   A tool to help the dispatcher check whether the delivery should be paused.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "liangyepianzhou (via GitHub)" <gi...@apache.org>.
liangyepianzhou commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435946350


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -359,6 +382,41 @@ public synchronized void readMoreEntries() {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+        // Allows new consumers to consume redelivered messages caused by the just-closed consumer.
+        if (readType != ReadType.Normal) {
+            return false;
+        }
+        if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+            return false;
+        }
+        if (cursor == null) {
+            return true;
+        }
+
+        /**
+         * Double check for "Cursor data can be fully persistent".
+         * - Clear the marker that represent some acknowledgements were executed.
+         * - Check whether dispatching should be paused due to cursor data is too large to persistent.
+         * - If dispatching should be paused, but some acknowledgements have been executed, re-calculate the result.
+         * - Mark delivery was paused at least once.
+         */
+        // Clear the marker that represent some acknowledgements were executed.
+        blockSignatureOnCursorDataCanNotFullyPersist.clearMakerNewAcknowledged();

Review Comment:
   This is following the [comment](https://github.com/apache/pulsar/pull/21423#discussion_r1435943914). The original approach is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1437093831


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -996,6 +1055,23 @@ public void addUnAckedMessages(int numberOfMessages) {
         topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
     }
 
+    @Override
+    public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {
+        /**
+         * - Mark an acknowledgement were executed.
+         * - If there was no previous pause due to cursor data is too large to persist, we don't need to manually
+         *   trigger a new read. This can avoid too many CPU circles.
+         * - Clear the marker that represent delivery was paused at least once in the earlier time.
+         */
+        blockSignatureOnCursorDataCanNotFullyPersist.markNewAcknowledged();
+        if (blockSignatureOnCursorDataCanNotFullyPersist.hasPausedAtLeastOnce()
+                && cursor.isCursorDataFullyPersistable()) {
+            // clear paused count, and trigger a new reading.
+            blockSignatureOnCursorDataCanNotFullyPersist.clearMarkerPaused();
+            readMoreEntriesAsync();
+        }
+    }

Review Comment:
   Good suggestion, Improved. Please take a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "liangyepianzhou (via GitHub)" <gi...@apache.org>.
liangyepianzhou commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1435909590


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/BlockDispatcherSignatureOnCursorDataCanNotFullyPersist.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+/***
+ * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
+ * Note: do not use this field to confirm whether the delivery should be paused,
+ *      please call {@link PersistentDispatcherMultipleConsumers#shouldPauseOnAckStatePersist}.
+ */
+public class BlockDispatcherSignatureOnCursorDataCanNotFullyPersist {
+
+    /**
+     * Used to mark that dispatching was paused at least once in the earlier time, due to the cursor data can not be
+     * fully persistent.
+     * Why need this filed? It just prevents that
+     * {@link PersistentDispatcherMultipleConsumers#afterAckMessages(Throwable, Object)} calls
+     * {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} every time, it can avoid too many CPU circles.
+     * We just call {@link PersistentDispatcherMultipleConsumers#readMoreEntries()} after the dispatching has been
+     * paused at least once earlier.
+     */
+    private volatile boolean markerPausedAtLeastOnce;
+
+    /**
+     * Used to mark some acknowledgements were executed.
+     * Because there is a race condition might cause dispatching stuck, the steps to reproduce the issue is like below:
+     * - {@link #markerPausedAtLeastOnce} is "false" now.
+     * - Thread-reading-entries: there are too many ack holes, so start to pause dispatching
+     * - Thread-ack: acked all messages.

Review Comment:
   `cursor.isCursorDataFullyPersistable()` in `shouldPauseOnAckStatePersist ` may be executed before acknowledgment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode merged PR #21423:
URL: https://github.com/apache/pulsar/pull/21423


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage [pulsar]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#issuecomment-1868780131

   ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: `26 lines` in your changes are missing coverage. Please review.
   > Comparison is base [(`181b20b`)](https://app.codecov.io/gh/apache/pulsar/commit/181b20bf59fdf51734c375b6563c8f1020a71881?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 73.42% compared to head [(`5cf31a2`)](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 73.40%.
   > Report is 6 commits behind head on master.
   
   <details><summary>Additional details and impacted files</summary>
   
   
   [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/21423/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #21423      +/-   ##
   ============================================
   - Coverage     73.42%   73.40%   -0.02%     
   - Complexity    32791    32827      +36     
   ============================================
     Files          1897     1899       +2     
     Lines        140633   140733     +100     
     Branches      15491    15508      +17     
   ============================================
   + Hits         103254   103306      +52     
   - Misses        29306    29345      +39     
   - Partials       8073     8082       +9     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/21423/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/21423/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.16% <24.63%> (-0.03%)` | :arrow_down: |
   | [systests](https://app.codecov.io/gh/apache/pulsar/pull/21423/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.85% <20.28%> (+0.22%)` | :arrow_up: |
   | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/21423/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `72.70% <62.31%> (-0.02%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...che/bookkeeper/mledger/impl/ManagedCursorImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9pbXBsL01hbmFnZWRDdXJzb3JJbXBsLmphdmE=) | `79.25% <100.00%> (+0.06%)` | :arrow_up: |
   | [...rg/apache/pulsar/broker/service/AbstractTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Fic3RyYWN0VG9waWMuamF2YQ==) | `87.77% <100.00%> (+0.03%)` | :arrow_up: |
   | [...a/org/apache/pulsar/broker/service/Dispatcher.java](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Rpc3BhdGNoZXIuamF2YQ==) | `57.89% <100.00%> (+2.33%)` | :arrow_up: |
   | [...atcherSignatureOnCursorDataCanNotFullyPersist.java](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvQmxvY2tEaXNwYXRjaGVyU2lnbmF0dXJlT25DdXJzb3JEYXRhQ2FuTm90RnVsbHlQZXJzaXN0LmphdmE=) | `100.00% <100.00%> (ø)` | |
   | [...sar/broker/service/persistent/PersistentTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFRvcGljLmphdmE=) | `78.84% <100.00%> (+0.16%)` | :arrow_up: |
   | [...r/common/policies/data/HierarchyTopicPolicies.java](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi9wb2xpY2llcy9kYXRhL0hpZXJhcmNoeVRvcGljUG9saWNpZXMuamF2YQ==) | `100.00% <100.00%> (ø)` | |
   | [...che/pulsar/common/policies/data/TopicPolicies.java](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi9wb2xpY2llcy9kYXRhL1RvcGljUG9saWNpZXMuamF2YQ==) | `58.73% <100.00%> (+0.66%)` | :arrow_up: |
   | [...a/org/apache/bookkeeper/mledger/ManagedCursor.java](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9NYW5hZ2VkQ3Vyc29yLmphdmE=) | `40.00% <0.00%> (-2.86%)` | :arrow_down: |
   | [...tent/PersistentDispatcherSingleActiveConsumer.java](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudERpc3BhdGNoZXJTaW5nbGVBY3RpdmVDb25zdW1lci5qYXZh) | `70.49% <0.00%> (-1.38%)` | :arrow_down: |
   | [...ker/service/persistent/PersistentSubscription.java](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFN1YnNjcmlwdGlvbi5qYXZh) | `76.44% <53.84%> (-0.82%)` | :arrow_down: |
   | ... and [1 more](https://app.codecov.io/gh/apache/pulsar/pull/21423?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | |
   
   ... and [71 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/21423/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org