You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/08/25 14:27:58 UTC

[pulsar] 01/03: [improve][test] Fix flaky C++ ClientTest.testWrongListener (#16510)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f3899d53959971b56ddce8e4809b6c759bc1d061
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jul 13 09:16:06 2022 +0800

    [improve][test] Fix flaky C++ ClientTest.testWrongListener (#16510)
    
    Fixes #16509
    
    ### Motivation
    
    Since all producers and consumer of the same Client share the same
    connection pool, if one failed, the following producer or consumer might
    reuse the broken connection and failed with `ResultConnectError`. It
    causes the flaky `testWrongListener` because the consumer and reader
    would be created after the producer creation failed with
    `ResultServiceNotReady`.
    
    ### Modifications
    
    Recreate the `Client` for the subsequent creation of `Consumer` and
    `Reader` so that new connection pools will be used for them.
    
    There is also a potential bug that makes `Client::shutdown` wait for the
    max timeout (3 seconds), this PR also applies the timeout for other
    executors.
    
    ### Verifying this change
    
    After this change, I've run the following command in my local env and it
    never failed.
    
    ```bash
    # rerun the testWrongListener for 20 times
    ./tests/main --gtest_filter='ClientTest.testWrongListener' --gtest_repeat=20
    ```
    
    (cherry picked from commit 10307f90ab9b0d168a5a40f40027177fae587602)
---
 pulsar-client-cpp/lib/ClientImpl.cc   | 4 ++--
 pulsar-client-cpp/tests/ClientTest.cc | 8 +++++---
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 7c9a3a18f4a..d419ffe7dc5 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -609,12 +609,12 @@ void ClientImpl::shutdown() {
     LOG_DEBUG("ioExecutorProvider_ is closed");
 
     timeoutProcessor.tik();
-    listenerExecutorProvider_->close();
+    listenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
     timeoutProcessor.tok();
     LOG_DEBUG("listenerExecutorProvider_ is closed");
 
     timeoutProcessor.tik();
-    partitionListenerExecutorProvider_->close();
+    partitionListenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
     timeoutProcessor.tok();
     LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
 }
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index dd073fb6bfc..14c3af70aa3 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -234,17 +234,19 @@ TEST(ClientTest, testWrongListener) {
     Producer producer;
     ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer));
     ASSERT_EQ(ResultProducerNotInitialized, producer.close());
+    ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
+    ASSERT_EQ(ResultOk, client.close());
 
+    // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the
+    // creation of Consumer or Reader could fail with ResultConnectError.
+    client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
     Consumer consumer;
     ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer));
     ASSERT_EQ(ResultConsumerNotInitialized, consumer.close());
 
-    ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
     ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
     ASSERT_EQ(ResultOk, client.close());
 
-    // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the
-    // creation of Reader would fail with ResultConnectError.
     client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
 
     // Currently Reader can only read a non-partitioned topic in C++ client