You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/10 06:52:55 UTC

[pulsar] 16/19: [fix][C++ client] Fix the close of Client might stuck or return a wrong result (#16285)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 88b1636578c6c1bb99c8d530f8f1b495e85a50de
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jul 6 23:21:24 2022 +0800

    [fix][C++ client] Fix the close of Client might stuck or return a wrong result (#16285)
    
    Fixes https://github.com/apache/pulsar/issues/15976
    
    ### Motivation
    
    Currently even if the producer, consumer, or reader failed to
    create, it would still be added to the producers or consumers in
    `Client`. `Client::close` first closes the internal producers and
    consumers, if the producers or consumers to close include failed
    producers or consumers, `Client::close` would return
    `ResultAlreadyClosed`. Even worse, closing a failed partitioned producer
    might stuck.
    
    It also makes the Python test `test_listener_name_client` flaky because
    `client.close()` will throw an exception if the underlying
    `Client::close` call in C++ client doesn't return `ResultOk`.
    
    ### Modifications
    
    - Only adding the created producer or consumer to the internal list of
      `Client` after the creation succeeded.
    - Add `ClientTest.testWrongListener` to verify when producer, consumer,
      reader failed to create, the internal producer list and consumer list
      are both empty. And `client.close()` would return `ResultOk`.
    
    (cherry picked from commit e23d312c04da1d82d35f9e2faf8a446f8e8a4eeb)
---
 pulsar-client-cpp/lib/ClientImpl.cc   | 41 ++++++++++++++++++++---------------
 pulsar-client-cpp/lib/ReaderImpl.cc   | 21 ++++++++++--------
 pulsar-client-cpp/lib/ReaderImpl.h    |  4 +---
 pulsar-client-cpp/tests/ClientTest.cc | 32 +++++++++++++++++++++++++++
 4 files changed, 69 insertions(+), 29 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 931fadeaae7..bfb72aa6ac3 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -185,9 +185,6 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
         producer->getProducerCreatedFuture().addListener(
             std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1,
                       std::placeholders::_2, callback, producer));
-        Lock lock(mutex_);
-        producers_.push_back(producer);
-        lock.unlock();
         producer->start();
     } else {
         LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on "
@@ -198,7 +195,14 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
 
 void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                        CreateProducerCallback callback, ProducerImplBasePtr producer) {
-    callback(result, Producer(producer));
+    if (result == ResultOk) {
+        Lock lock(mutex_);
+        producers_.push_back(producer);
+        lock.unlock();
+        callback(result, Producer(producer));
+    } else {
+        callback(result, {});
+    }
 }
 
 void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId,
@@ -241,10 +245,13 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
 
     ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf,
                                                         getListenerExecutorProvider()->get(), callback);
-    reader->start(startMessageId);
-
-    Lock lock(mutex_);
-    consumers_.push_back(reader->getConsumer());
+    ConsumerImplBasePtr consumer = reader->getConsumer().lock();
+    auto self = shared_from_this();
+    reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
+        Lock lock(mutex_);
+        consumers_.push_back(weakConsumerPtr);
+        lock.unlock();
+    });
 }
 
 void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
@@ -291,9 +298,6 @@ void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const Nam
         consumer->getConsumerCreatedFuture().addListener(
             std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
                       std::placeholders::_2, callback, consumer));
-        Lock lock(mutex_);
-        consumers_.push_back(consumer);
-        lock.unlock();
         consumer->start();
     } else {
         LOG_ERROR("Error Getting topicsOfNameSpace while createPatternMultiTopicsConsumer:  " << result);
@@ -317,6 +321,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st
             return;
         }
     }
+    lock.unlock();
 
     if (topicNamePtr) {
         std::string randomName = generateRandomName();
@@ -331,8 +336,6 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st
     consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
                                                                shared_from_this(), std::placeholders::_1,
                                                                std::placeholders::_2, callback, consumer));
-    consumers_.push_back(consumer);
-    lock.unlock();
     consumer->start();
 }
 
@@ -389,9 +392,6 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
         consumer->getConsumerCreatedFuture().addListener(
             std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
                       std::placeholders::_2, callback, consumer));
-        Lock lock(mutex_);
-        consumers_.push_back(consumer);
-        lock.unlock();
         consumer->start();
     } else {
         LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString()
@@ -402,7 +402,14 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
 
 void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                        SubscribeCallback callback, ConsumerImplBasePtr consumer) {
-    callback(result, Consumer(consumer));
+    if (result == ResultOk) {
+        Lock lock(mutex_);
+        consumers_.push_back(consumer);
+        lock.unlock();
+        callback(result, Consumer(consumer));
+    } else {
+        callback(result, {});
+    }
 }
 
 Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) {
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 9401c120119..c660c01ab2a 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -35,7 +35,8 @@ ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, con
                        const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback)
     : topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {}
 
-void ReaderImpl::start(const MessageId& startMessageId) {
+void ReaderImpl::start(const MessageId& startMessageId,
+                       std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
     ConsumerConfiguration consumerConf;
     consumerConf.setConsumerType(ConsumerExclusive);
     consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
@@ -79,19 +80,21 @@ void ReaderImpl::start(const MessageId& startMessageId) {
         client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), false, NonPartitioned,
         Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
     consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
-    consumer_->getConsumerCreatedFuture().addListener(std::bind(&ReaderImpl::handleConsumerCreated,
-                                                                shared_from_this(), std::placeholders::_1,
-                                                                std::placeholders::_2));
+    auto self = shared_from_this();
+    consumer_->getConsumerCreatedFuture().addListener(
+        [this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
+            if (result == ResultOk) {
+                readerCreatedCallback_(result, Reader(self));
+                callback(weakConsumerPtr);
+            } else {
+                readerCreatedCallback_(result, {});
+            }
+        });
     consumer_->start();
 }
 
 const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); }
 
-void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
-    auto self = shared_from_this();
-    readerCreatedCallback_(result, Reader(self));
-}
-
 Result ReaderImpl::readNext(Message& msg) {
     Result res = consumer_->receive(msg);
     acknowledgeIfNecessary(res, msg);
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index 6de6c02e460..b0d8a6bc40a 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -42,7 +42,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
     ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
                const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
 
-    void start(const MessageId& startMessageId);
+    void start(const MessageId& startMessageId, std::function<void(const ConsumerImplBaseWeakPtr&)> callback);
 
     const std::string& getTopic() const;
 
@@ -65,8 +65,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
     bool isConnected() const;
 
    private:
-    void handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer);
-
     void messageListener(Consumer consumer, const Message& msg);
 
     void acknowledgeIfNecessary(Result result, const Message& msg);
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 135c3f19b51..7c5ff8b307f 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -236,3 +236,35 @@ TEST(ClientTest, testReferenceCount) {
     ASSERT_EQ(readerWeakPtr.use_count(), 0);
     client.close();
 }
+
+TEST(ClientTest, testWrongListener) {
+    const std::string topic = "client-test-wrong-listener-" + std::to_string(time(nullptr));
+    auto httpCode = makePutRequest(
+        "http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions", "3");
+    LOG_INFO("create " << topic << ": " << httpCode);
+
+    Client client(lookupUrl, ClientConfiguration().setListenerName("test"));
+    Producer producer;
+    ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer));
+    ASSERT_EQ(ResultProducerNotInitialized, producer.close());
+
+    Consumer consumer;
+    ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer));
+    ASSERT_EQ(ResultConsumerNotInitialized, consumer.close());
+
+    ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
+    ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
+    ASSERT_EQ(ResultOk, client.close());
+
+    // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the
+    // creation of Reader would fail with ResultConnectError.
+    client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
+
+    // Currently Reader can only read a non-partitioned topic in C++ client
+    Reader reader;
+    ASSERT_EQ(ResultServiceUnitNotReady,
+              client.createReader(topic + "-partition-0", MessageId::earliest(), {}, reader));
+    ASSERT_EQ(ResultConsumerNotInitialized, reader.close());
+    ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
+    ASSERT_EQ(ResultOk, client.close());
+}