You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/08 11:20:24 UTC

[GitHub] [pulsar] shibd commented on a diff in pull request #17318: [fix][client] Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout.

shibd commented on code in PR #17318:
URL: https://github.com/apache/pulsar/pull/17318#discussion_r990629222


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java:
##########
@@ -71,9 +72,13 @@ public void run(Timeout t) throws Exception {
                     }
                     headPartition.clear();
                     redeliveryTimePartitions.addLast(headPartition);
-                    triggerRedelivery(consumerBase);
+                    messageIds = getRedeliveryMessages(consumerBase);
                 } finally {
                     writeLock.unlock();
+                    if (messageIds != null && !messageIds.isEmpty()) {
+                        consumerBase.onAckTimeoutSend(messageIds);
+                        consumerBase.redeliverUnacknowledgedMessages(messageIds);

Review Comment:
   You can refer to the timing diagram below:
   
   |Time | UnAckedMessageRedeliverTracker timer run | internalPinnedExecutor |
   |--| --| --|
   |0| [writeLock.lock()](https://github.com/apache/pulsar/blob/a2a73beb52865ddfcef92dab37ad0bcf3bc49fd5/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java#L62) // **first lock**| |
   |1| [consumerBase.redeliverUnacknowledgedMessages(messageIds)](https://github.com/apache/pulsar/blob/74b5d505554ea75c81cd004c3e5b61af094848bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java#L122)||
   |2| ConsumerImpl.redeliverUnacknowledgedMessages()| |
   |3| internalRedeliverUnacknowledgedMessages().get(); // block ||
   |4|  |internalRedeliverUnacknowledgedMessages()|
   |5| | [unAckedMessageTracker.clear();](https://github.com/apache/pulsar/blob/492c7df1579c12701da6a77af1986d3aa7ace840/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1920)|
   |6| |writeLock.lock() // **second lock, blocked forever**|
   
   There was no problem before because `redeliverUnacknowledgedMessages` was executed in a `UnAckedMessageRedeliverTracker.timer thread`. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org