You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/02 01:16:25 UTC

[pulsar] branch branch-2.10 updated: Ensure ack-timeout task gets re-scheduled when there are exception in the final stage (#16337)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 0ce7245851c Ensure ack-timeout task gets re-scheduled when there are exception in the final stage (#16337)
0ce7245851c is described below

commit 0ce7245851c76cc619a17b32166715251323ec14
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jul 1 18:13:43 2022 -0700

    Ensure ack-timeout task gets re-scheduled when there are exception in the final stage (#16337)
---
 .../pulsar/client/impl/UnAckedMessageTracker.java      | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index a6b8faa2c2e..9ad30296c21 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -125,6 +125,10 @@ public class UnAckedMessageTracker implements Closeable {
             timeout = client.timer().newTimeout(new TimerTask() {
                 @Override
                 public void run(Timeout t) throws Exception {
+                    if (t.isCancelled()) {
+                        return;
+                    }
+
                     Set<MessageId> messageIds = TL_MESSAGE_IDS_SET.get();
                     messageIds.clear();
 
@@ -143,12 +147,16 @@ public class UnAckedMessageTracker implements Closeable {
                         headPartition.clear();
                         timePartitions.addLast(headPartition);
                     } finally {
-                        writeLock.unlock();
-                        if (messageIds.size() > 0) {
-                            consumerBase.onAckTimeoutSend(messageIds);
-                            consumerBase.redeliverUnacknowledgedMessages(messageIds);
+                        try {
+                            timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
+                        } finally {
+                            writeLock.unlock();
+
+                            if (!messageIds.isEmpty()) {
+                                consumerBase.onAckTimeoutSend(messageIds);
+                                consumerBase.redeliverUnacknowledgedMessages(messageIds);
+                            }
                         }
-                        timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
                     }
                 }
             }, this.tickDurationInMs, TimeUnit.MILLISECONDS);