You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/02 01:16:25 UTC
[pulsar] branch branch-2.10 updated: Ensure ack-timeout task gets re-scheduled when there are exception in the final stage (#16337)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 0ce7245851c Ensure ack-timeout task gets re-scheduled when there are exception in the final stage (#16337)
0ce7245851c is described below
commit 0ce7245851c76cc619a17b32166715251323ec14
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jul 1 18:13:43 2022 -0700
Ensure ack-timeout task gets re-scheduled when there are exception in the final stage (#16337)
---
.../pulsar/client/impl/UnAckedMessageTracker.java | 18 +++++++++++++-----
1 file changed, 13 insertions(+), 5 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index a6b8faa2c2e..9ad30296c21 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -125,6 +125,10 @@ public class UnAckedMessageTracker implements Closeable {
timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
+ if (t.isCancelled()) {
+ return;
+ }
+
Set<MessageId> messageIds = TL_MESSAGE_IDS_SET.get();
messageIds.clear();
@@ -143,12 +147,16 @@ public class UnAckedMessageTracker implements Closeable {
headPartition.clear();
timePartitions.addLast(headPartition);
} finally {
- writeLock.unlock();
- if (messageIds.size() > 0) {
- consumerBase.onAckTimeoutSend(messageIds);
- consumerBase.redeliverUnacknowledgedMessages(messageIds);
+ try {
+ timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
+ } finally {
+ writeLock.unlock();
+
+ if (!messageIds.isEmpty()) {
+ consumerBase.onAckTimeoutSend(messageIds);
+ consumerBase.redeliverUnacknowledgedMessages(messageIds);
+ }
}
- timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
}
}
}, this.tickDurationInMs, TimeUnit.MILLISECONDS);