You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/15 00:03:58 UTC
[pulsar] branch master updated: NPE in
UnAckedMessageTrackerDisabled#size (#4535) (#4536)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f452e6f NPE in UnAckedMessageTrackerDisabled#size (#4535) (#4536)
f452e6f is described below
commit f452e6f817b5a613ded49a20ab2549fd5e34718c
Author: hello zepp <ji...@qq.com>
AuthorDate: Sat Jun 15 08:03:53 2019 +0800
NPE in UnAckedMessageTrackerDisabled#size (#4535) (#4536)
---
.../PerMessageUnAcknowledgedRedeliveryTest.java | 43 ++++++++++++++++++++++
.../pulsar/client/impl/UnAckedMessageTracker.java | 5 +++
2 files changed, 48 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 6db887b..508de46 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
@@ -151,6 +152,48 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase {
}
@Test(timeOut = testTimeout)
+ public void testUnAckedMessageTrackerSize() throws Exception {
+ String key = "testUnAckedMessageTrackerSize";
+ final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
+ final String subscriptionName = "my-ex-subscription-" + key;
+ final String messagePredicate = "my-message-" + key + "-";
+ final int totalMessages = 15;
+
+ // 1. producer connect
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ // 2. Create consumer,doesn't set the ackTimeout
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .receiverQueueSize(50).subscriptionType(SubscriptionType.Shared).subscribe();
+
+ // 3. producer publish messages
+ for (int i = 0; i < totalMessages / 3; i++) {
+ String message = messagePredicate + i;
+ log.info("Producer produced: " + message);
+ producer.send(message.getBytes());
+ }
+
+ // 4. Receiver receives the message, doesn't ack
+ Message<byte[]> message = consumer.receive();
+ while (message != null) {
+ String data = new String(message.getData());
+ log.info("Consumer received : " + data);
+ message = consumer.receive(100, TimeUnit.MILLISECONDS);
+ }
+ UnAckedMessageTracker unAckedMessageTracker = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker();
+ long size = unAckedMessageTracker.size();
+ log.info(key + " Unacked Message Tracker size is " + size);
+ // 5. If ackTimeout is not set, UnAckedMessageTracker is a disabled method
+ assertEquals(size, 0);
+ assertTrue(unAckedMessageTracker.add(null));
+ assertTrue(unAckedMessageTracker.remove(null));
+ assertEquals(unAckedMessageTracker.removeMessagesTill(null), 0);
+ }
+
+ @Test(timeOut = testTimeout)
public void testExclusiveAckedNormalTopic() throws Exception {
String key = "testExclusiveAckedNormalTopic";
final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index e40f846..0ab17d9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -55,6 +55,11 @@ public class UnAckedMessageTracker implements Closeable {
}
@Override
+ long size() {
+ return 0;
+ }
+
+ @Override
public boolean add(MessageId m) {
return true;
}