You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/04/26 03:50:22 UTC

[pulsar] branch master updated: [fix][client] Fix negative ack not redelivery. (#15312)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f6532a43ef [fix][client] Fix negative ack not redelivery. (#15312)
9f6532a43ef is described below

commit 9f6532a43eff5021896ed2fd8e3a771ce4d8cc7b
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Apr 26 11:50:15 2022 +0800

    [fix][client] Fix negative ack not redelivery. (#15312)
---
 .../pulsar/client/impl/NegativeAcksTest.java       |  3 +++
 .../pulsar/client/impl/NegativeAcksTracker.java    | 28 ++++++----------------
 2 files changed, 10 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 8ff339fed07..ba35529d024 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -106,6 +106,9 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         log.info("Test negative acks batching={} partitions={} subType={} negAckDelayMs={}", batching, usePartitions,
                 subscriptionType, negAcksDelayMillis);
         String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");
+        if (usePartitions) {
+            admin.topics().createPartitionedTopic(topic, 2);
+        }
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 17238ece38e..6273f4d582e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -85,29 +85,10 @@ class NegativeAcksTracker implements Closeable {
     }
 
     public synchronized void add(MessageId messageId) {
-        if (messageId instanceof BatchMessageIdImpl) {
-            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-            messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
-                    batchMessageId.getPartitionIndex());
-        }
-
-        if (nackedMessages == null) {
-            nackedMessages = new HashMap<>();
-        }
-        nackedMessages.put(messageId, System.nanoTime() + nackDelayNanos);
-
-        if (this.timeout == null) {
-            // Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
-            // nack immediately following the current one will be batched into the same redeliver request.
-            this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
-        }
+        add(messageId, 0);
     }
 
     public synchronized void add(Message<?> message) {
-        if (negativeAckRedeliveryBackoff == null) {
-            add(message.getMessageId());
-            return;
-        }
         add(message.getMessageId(), message.getRedeliveryCount());
     }
 
@@ -127,7 +108,12 @@ class NegativeAcksTracker implements Closeable {
             nackedMessages = new HashMap<>();
         }
 
-        long backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
+        long backoffNs;
+        if (negativeAckRedeliveryBackoff != null) {
+            backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
+        } else {
+            backoffNs = nackDelayNanos;
+        }
         nackedMessages.put(messageId, System.nanoTime() + backoffNs);
 
         if (this.timeout == null) {