You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/14 16:15:19 UTC

[incubator-pulsar] branch master updated: Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ec74355  Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener
ec74355 is described below

commit ec7435565520ee6ac3ed2ce4a0cd13e8de7dbca3
Author: Jia Zhai <ji...@users.noreply.github.com>
AuthorDate: Sat Sep 15 00:15:13 2018 +0800

    Issue #2574: Timeout message not get redeliver in TopicsConsumer when use message listener
    
    ### Motivation
    
    fix issue #2574 .
    Timeout message not get redeliver in TopicsConsumer when use message listener.
    This is caused by message listener wrongly set in individual sub-ConsumerImpl.
    
    ### Modifications
    
    set message listener to null for individual sub-ConsumerImpl.
    Add a UT
    
    ### Result
    
    UT passed.
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 58 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  1 +
 2 files changed, 59 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index d28d1d5..55be9ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -615,4 +616,61 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         }
     }
 
+    /**
+     * Test Listener for github issue #2547
+     */
+    @Test(timeOut = 30000)
+    public void testMultiTopicsMessageListener() throws Exception {
+        String key = "MultiTopicsMessageListenerTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 6;
+
+        // set latch larger than totalMessages, so timeout message get resend
+        CountDownLatch latch = new CountDownLatch(totalMessages * 3);
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1);
+
+        admin.tenants().createTenant("prop", new TenantInfo());
+        admin.topics().createPartitionedTopic(topicName1, 2);
+
+        // 1. producer connect
+        Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1)
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+
+        // 2. Create consumer, set not ack in message listener, so time-out message will resend
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            .topics(topicNames)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(1000, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(100)
+            .messageListener((c1, msg) -> {
+                assertNotNull(msg, "Message cannot be null");
+                String receivedMessage = new String(msg.getData());
+                latch.countDown();
+
+                log.info("Received message [{}] in the listener, latch: {}",
+                    receivedMessage, latch.getCount());
+                // since not acked, it should retry another time
+                //c1.acknowledgeAsync(msg);
+            })
+            .subscribe();
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+
+        MultiTopicsConsumerImpl topicsConsumer = (MultiTopicsConsumerImpl) consumer;
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+        }
+
+        // verify should not time out, because of message redelivered several times.
+        latch.await();
+
+        consumer.close();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 75fdac6..8b8556a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -485,6 +485,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         ConsumerConfigurationData<T> internalConsumerConfig = conf.clone();
         internalConsumerConfig.setSubscriptionName(subscription);
         internalConsumerConfig.setConsumerName(consumerName);
+        internalConsumerConfig.setMessageListener(null);
         return internalConsumerConfig;
     }