You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2019/06/26 03:35:05 UTC

[pulsar] branch master updated: Reduce unnecessary track message calls. (#4595)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 553f0fd  Reduce unnecessary track message calls. (#4595)
553f0fd is described below

commit 553f0fde697239ee72798f09371d0f283df7f6e9
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jun 26 11:35:00 2019 +0800

    Reduce unnecessary track message calls. (#4595)
---
 .../pulsar/client/api/ConsumerRedeliveryTest.java  | 54 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 43 ++++++-----------
 .../client/impl/MultiTopicsConsumerImpl.java       |  2 +-
 3 files changed, 70 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index 63c974d..a42d80c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -18,7 +18,11 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -29,6 +33,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.Sets;
 
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.assertEquals;
 
@@ -123,4 +128,53 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
         consumer2.close();
     }
 
+    @Test
+    public void testUnAckMessageRedeliveryWithReceiveAsync() throws PulsarClientException, ExecutionException, InterruptedException {
+        String topic = "persistent://my-property/my-ns/async-unack-redelivery";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("s1")
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .subscribe();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxMessages(5)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .create();
+
+        final int messages = 10;
+        List<CompletableFuture<Message<String>>> futures = new ArrayList<>(10);
+        for (int i = 0; i < messages; i++) {
+            futures.add(consumer.receiveAsync());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            producer.sendAsync("my-message-" + i);
+        }
+
+        int messageReceived = 0;
+        for (CompletableFuture<Message<String>> future : futures) {
+            Message<String> message = future.get();
+            assertNotNull(message);
+            messageReceived++;
+            // Don't ack message, wait for ack timeout.
+        }
+
+        assertEquals(10, messageReceived);
+
+        for (int i = 0; i < messages; i++) {
+            Message<String> message = consumer.receive();
+            assertNotNull(message);
+            messageReceived++;
+            consumer.acknowledge(message);
+        }
+
+        assertEquals(20, messageReceived);
+
+        producer.close();
+        consumer.close();
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a25a98a..8704fd3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -312,10 +312,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         Message<T> message;
         try {
             message = incomingMessages.take();
-            trackMessage(message);
-            Message<T> interceptMsg = beforeConsume(message);
-            messageProcessed(interceptMsg);
-            return interceptMsg;
+            messageProcessed(message);
+            return beforeConsume(message);
         } catch (InterruptedException e) {
             stats.incrementNumReceiveFailed();
             throw PulsarClientException.unwrap(e);
@@ -341,10 +339,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         if (message != null) {
-            trackMessage(message);
-            Message<T> interceptMsg = beforeConsume(message);
-            messageProcessed(interceptMsg);
-            result.complete(interceptMsg);
+            messageProcessed(message);
+            result.complete(beforeConsume(message));
         }
 
         return result;
@@ -355,12 +351,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         Message<T> message;
         try {
             message = incomingMessages.poll(timeout, unit);
-            trackMessage(message);
-            Message<T> interceptMsg = beforeConsume(message);
-            if (interceptMsg != null) {
-                messageProcessed(interceptMsg);
+            if (message == null) {
+                return null;
             }
-            return interceptMsg;
+            messageProcessed(message);
+            return beforeConsume(message);
         } catch (InterruptedException e) {
             State state = getState();
             if (state != State.Closing && state != State.Closed) {
@@ -821,7 +816,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message));
                 }
                 if (!pendingReceives.isEmpty()) {
-                    trackMessage(message);
                     notifyPendingReceivedCallback(message, null);
                 } else if (canEnqueueMessage(message)) {
                     incomingMessages.add(message);
@@ -1044,19 +1038,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         increaseAvailablePermits(currentCnx);
         stats.updateNumMsgsReceived(msg);
 
-        if (conf.getAckTimeoutMillis() != 0) {
-            // reset timer for messages that are received by the client
-            MessageIdImpl id = (MessageIdImpl) msg.getMessageId();
-            if (id instanceof BatchMessageIdImpl) {
-                id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
-            }
-            if (partitionIndex != -1) {
-                // we should no longer track this message, TopicsConsumer will take care from now onwards
-                unAckedMessageTracker.remove(id);
-            } else {
-                unAckedMessageTracker.add(id);
-            }
-        }
+        trackMessage(msg);
     }
 
     protected void trackMessage(Message<?> msg) {
@@ -1068,7 +1050,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     // do not add each item in batch message into tracker
                     id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
                 }
-                unAckedMessageTracker.add(id);
+                if (partitionIndex != -1) {
+                    // we should no longer track this message, TopicsConsumer will take care from now onwards
+                    unAckedMessageTracker.remove(id);
+                } else {
+                    unAckedMessageTracker.add(id);
+                }
             }
         }
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index a5e5138..5d45ca0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -260,7 +260,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         try {
             TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
                 consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
-            unAckedMessageTracker.add(topicMessage.getMessageId());
 
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Received message from topics-consumer {}",
@@ -270,6 +269,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
             if (!pendingReceives.isEmpty()) {
                 CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+                unAckedMessageTracker.add(topicMessage.getMessageId());
                 listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
             } else {
                 // Enqueue the message so that it can be retrieved when application calls receive()