You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/15 00:03:58 UTC

[pulsar] branch master updated: NPE in UnAckedMessageTrackerDisabled#size (#4535) (#4536)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f452e6f  NPE in UnAckedMessageTrackerDisabled#size (#4535) (#4536)
f452e6f is described below

commit f452e6f817b5a613ded49a20ab2549fd5e34718c
Author: hello zepp <ji...@qq.com>
AuthorDate: Sat Jun 15 08:03:53 2019 +0800

    NPE in UnAckedMessageTrackerDisabled#size (#4535) (#4536)
---
 .../PerMessageUnAcknowledgedRedeliveryTest.java    | 43 ++++++++++++++++++++++
 .../pulsar/client/impl/UnAckedMessageTracker.java  |  5 +++
 2 files changed, 48 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 6db887b..508de46 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 import java.util.concurrent.TimeUnit;
 
@@ -151,6 +152,48 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase {
     }
 
     @Test(timeOut = testTimeout)
+    public void testUnAckedMessageTrackerSize() throws Exception {
+        String key = "testUnAckedMessageTrackerSize";
+        final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 15;
+
+        // 1. producer connect
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+
+        // 2. Create consumer,doesn't set the ackTimeout
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .receiverQueueSize(50).subscriptionType(SubscriptionType.Shared).subscribe();
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            String message = messagePredicate + i;
+            log.info("Producer produced: " + message);
+            producer.send(message.getBytes());
+        }
+
+        // 4. Receiver receives the message, doesn't ack
+        Message<byte[]> message = consumer.receive();
+        while (message != null) {
+            String data = new String(message.getData());
+            log.info("Consumer received : " + data);
+            message = consumer.receive(100, TimeUnit.MILLISECONDS);
+        }
+        UnAckedMessageTracker unAckedMessageTracker = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker();
+        long size = unAckedMessageTracker.size();
+        log.info(key + " Unacked Message Tracker size is " + size);
+        // 5. If ackTimeout is not set, UnAckedMessageTracker is a disabled method
+        assertEquals(size, 0);
+        assertTrue(unAckedMessageTracker.add(null));
+        assertTrue(unAckedMessageTracker.remove(null));
+        assertEquals(unAckedMessageTracker.removeMessagesTill(null), 0);
+    }
+
+    @Test(timeOut = testTimeout)
     public void testExclusiveAckedNormalTopic() throws Exception {
         String key = "testExclusiveAckedNormalTopic";
         final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index e40f846..0ab17d9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -55,6 +55,11 @@ public class UnAckedMessageTracker implements Closeable {
         }
 
         @Override
+        long size() {
+            return 0;
+        }
+
+        @Override
         public boolean add(MessageId m) {
             return true;
         }