You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/03 19:47:36 UTC

[GitHub] sijie closed pull request #3079: Change un-ack messages start tracking behavior

sijie closed pull request #3079: Change un-ack messages start tracking behavior
URL: https://github.com/apache/pulsar/pull/3079
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2ff500ba36..82df18c93c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -26,6 +26,7 @@
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 public class DeadLetterTopicTest extends ProducerConsumerBase {
@@ -256,4 +257,32 @@ public void testDeadLetterTopicByCustomTopicName() throws Exception {
         assertNull(checkMessage);
         checkConsumer.close();
     }
+
+    /**
+     * issue https://github.com/apache/pulsar/issues/3077
+     */
+    @Test(timeOut = 200000)
+    public void testDeadLetterWithoutConsumerReceiveImmediately() throws PulsarClientException, InterruptedException {
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("my-subscription")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        producer.send(("a message").getBytes());
+
+        // Wait a while, message should not be send to DLQ
+        Thread.sleep(5000L);
+
+        Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+        assertNotNull(msg);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index e178febdcb..40da99fb6c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -349,9 +349,11 @@ public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException,
 
         Thread.sleep((long) (ackTimeOutMillis * 1.1));
 
-        for (int i = 0; i < totalMessages - 1; i++) {
+        for (int i = 0; i < totalMessages; i++) {
             Message<byte[]> msg = consumer.receive();
-            consumer.acknowledge(msg);
+            if (i != totalMessages - 1) {
+                consumer.acknowledge(msg);
+            }
         }
 
         assertEquals(consumer.getUnAckedMessageTracker().size(), 1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6ae925d21c..3e2d4d6a8a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -294,6 +294,7 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         Message<T> message;
         try {
             message = incomingMessages.take();
+            trackMessage(message);
             Message<T> interceptMsg = beforeConsume(message);
             messageProcessed(interceptMsg);
             return interceptMsg;
@@ -325,6 +326,7 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         if (message == null && conf.getReceiverQueueSize() == 0) {
             sendFlowPermitsToBroker(cnx(), 1);
         } else if (message != null) {
+            trackMessage(message);
             Message<T> interceptMsg = beforeConsume(message);
             messageProcessed(interceptMsg);
             result.complete(interceptMsg);
@@ -385,6 +387,7 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         Message<T> message;
         try {
             message = incomingMessages.poll(timeout, unit);
+            trackMessage(message);
             Message<T> interceptMsg = beforeConsume(message);
             if (interceptMsg != null) {
                 messageProcessed(interceptMsg);
@@ -829,11 +832,11 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
                 // Enqueue the message so that it can be retrieved when application calls receive()
                 // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
                 // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
-                unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
                 if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                     possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message));
                 }
                 if (!pendingReceives.isEmpty()) {
+                    trackMessage(message);
                     notifyPendingReceivedCallback(message, null);
                 } else if (conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize) {
                     incomingMessages.add(message);
@@ -957,7 +960,6 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
         MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
                 getPartitionIndex());
         BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
-        unAckedMessageTracker.add(batchMessage);
         List<MessageImpl<T>> possibleToDeadLetter = null;
         if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
             possibleToDeadLetter = new ArrayList<>();
@@ -1068,6 +1070,20 @@ protected synchronized void messageProcessed(Message<?> msg) {
         }
     }
 
+    protected void trackMessage(Message<?> msg) {
+        if (msg != null) {
+            MessageId messageId = msg.getMessageId();
+            if (conf.getAckTimeoutMillis() > 0 && messageId instanceof MessageIdImpl) {
+                MessageIdImpl id = (MessageIdImpl)messageId;
+                if (id instanceof BatchMessageIdImpl) {
+                    // do not add each item in batch message into tracker
+                    id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
+                }
+                unAckedMessageTracker.add(id);
+            }
+        }
+    }
+
     void increaseAvailablePermits(ClientCnx currentCnx) {
         increaseAvailablePermits(currentCnx, 1);
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services