You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/03/22 03:21:26 UTC

[GitHub] [pulsar] merlimat commented on a change in pull request #3879: fix: ack timeout in pulsar cpp client when subscribing to regex topic

merlimat commented on a change in pull request #3879: fix: ack timeout in  pulsar cpp client when subscribing to regex topic
URL: https://github.com/apache/pulsar/pull/3879#discussion_r268027389
 
 

 ##########
 File path: pulsar-client-cpp/tests/BasicEndToEndTest.cc
 ##########
 @@ -2940,3 +2940,50 @@ TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) {
     // consumer C should be a different instance from A and B and should be with open state.
     ASSERT_EQ(ResultOk, consumerC.close());
 }
+
+static long regexTestMessagesReceived = 0;
+
+static void regexMessageListenerFunction(Consumer consumer, const Message &msg) {
+    regexTestMessagesReceived++;
+}
+
+TEST(BasicEndToEndTest, testRegexTopicsWithMessageListener) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    long unAckedMessagesTimeoutMs = 10000;
+    std::string subsName = "testRegexTopicsWithMessageListener-sub";
+    std::string pattern =
+        "persistent://public/default/testRegexTopicsWithMessageListenerTopic-.*";
+    ConsumerConfiguration consumerConf;
+    consumerConf.setConsumerType(ConsumerShared);
+    consumerConf.setMessageListener(
+            std::bind(regexMessageListenerFunction, std::placeholders::_1, std::placeholders::_2));
+    consumerConf.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+
+    Producer producer;
+    ProducerConfiguration producerConf;
+    Result result = client.createProducer("persistent://public/default/testRegexTopicsWithMessageListenerTopic-1", producerConf, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribeWithRegex(pattern, subsName, consumerConf, consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subsName);
+
+    for (int i = 0; i < 10; i++) {
+        Message msg = MessageBuilder().setContent("test-" + std::to_string(i)).build();
+        producer.sendAsync(msg, nullptr);
+    }
+
+    producer.flush();
+    long timeWaited = 0;
+    while(true) {
+        // maximum wait time
+        ASSERT_LE(timeWaited, unAckedMessagesTimeoutMs * 1000 * 3);
+        if (regexTestMessagesReceived >= 10 * 2) {
+            break;
+        }
+        usleep(500000);
 
 Review comment:
   `unistd.h` is only posix specific. `std::this_thread::sleep` is the preferred c++ 11 way to sleep 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services