You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/10 06:52:55 UTC
[pulsar] 16/19: [fix][C++ client] Fix the close of Client might stuck or return a wrong result (#16285)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 88b1636578c6c1bb99c8d530f8f1b495e85a50de
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jul 6 23:21:24 2022 +0800
[fix][C++ client] Fix the close of Client might stuck or return a wrong result (#16285)
Fixes https://github.com/apache/pulsar/issues/15976
### Motivation
Currently even if the producer, consumer, or reader failed to
create, it would still be added to the producers or consumers in
`Client`. `Client::close` first closes the internal producers and
consumers, if the producers or consumers to close include failed
producers or consumers, `Client::close` would return
`ResultAlreadyClosed`. Even worse, closing a failed partitioned producer
might stuck.
It also makes the Python test `test_listener_name_client` flaky because
`client.close()` will throw an exception if the underlying
`Client::close` call in C++ client doesn't return `ResultOk`.
### Modifications
- Only adding the created producer or consumer to the internal list of
`Client` after the creation succeeded.
- Add `ClientTest.testWrongListener` to verify when producer, consumer,
reader failed to create, the internal producer list and consumer list
are both empty. And `client.close()` would return `ResultOk`.
(cherry picked from commit e23d312c04da1d82d35f9e2faf8a446f8e8a4eeb)
---
pulsar-client-cpp/lib/ClientImpl.cc | 41 ++++++++++++++++++++---------------
pulsar-client-cpp/lib/ReaderImpl.cc | 21 ++++++++++--------
pulsar-client-cpp/lib/ReaderImpl.h | 4 +---
pulsar-client-cpp/tests/ClientTest.cc | 32 +++++++++++++++++++++++++++
4 files changed, 69 insertions(+), 29 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 931fadeaae7..bfb72aa6ac3 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -185,9 +185,6 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
producer->getProducerCreatedFuture().addListener(
std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, producer));
- Lock lock(mutex_);
- producers_.push_back(producer);
- lock.unlock();
producer->start();
} else {
LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on "
@@ -198,7 +195,14 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
CreateProducerCallback callback, ProducerImplBasePtr producer) {
- callback(result, Producer(producer));
+ if (result == ResultOk) {
+ Lock lock(mutex_);
+ producers_.push_back(producer);
+ lock.unlock();
+ callback(result, Producer(producer));
+ } else {
+ callback(result, {});
+ }
}
void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId,
@@ -241,10 +245,13 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf,
getListenerExecutorProvider()->get(), callback);
- reader->start(startMessageId);
-
- Lock lock(mutex_);
- consumers_.push_back(reader->getConsumer());
+ ConsumerImplBasePtr consumer = reader->getConsumer().lock();
+ auto self = shared_from_this();
+ reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
+ Lock lock(mutex_);
+ consumers_.push_back(weakConsumerPtr);
+ lock.unlock();
+ });
}
void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
@@ -291,9 +298,6 @@ void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const Nam
consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
- Lock lock(mutex_);
- consumers_.push_back(consumer);
- lock.unlock();
consumer->start();
} else {
LOG_ERROR("Error Getting topicsOfNameSpace while createPatternMultiTopicsConsumer: " << result);
@@ -317,6 +321,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st
return;
}
}
+ lock.unlock();
if (topicNamePtr) {
std::string randomName = generateRandomName();
@@ -331,8 +336,6 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
- consumers_.push_back(consumer);
- lock.unlock();
consumer->start();
}
@@ -389,9 +392,6 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
- Lock lock(mutex_);
- consumers_.push_back(consumer);
- lock.unlock();
consumer->start();
} else {
LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString()
@@ -402,7 +402,14 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
- callback(result, Consumer(consumer));
+ if (result == ResultOk) {
+ Lock lock(mutex_);
+ consumers_.push_back(consumer);
+ lock.unlock();
+ callback(result, Consumer(consumer));
+ } else {
+ callback(result, {});
+ }
}
Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) {
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 9401c120119..c660c01ab2a 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -35,7 +35,8 @@ ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, con
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback)
: topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {}
-void ReaderImpl::start(const MessageId& startMessageId) {
+void ReaderImpl::start(const MessageId& startMessageId,
+ std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
ConsumerConfiguration consumerConf;
consumerConf.setConsumerType(ConsumerExclusive);
consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
@@ -79,19 +80,21 @@ void ReaderImpl::start(const MessageId& startMessageId) {
client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), false, NonPartitioned,
Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
- consumer_->getConsumerCreatedFuture().addListener(std::bind(&ReaderImpl::handleConsumerCreated,
- shared_from_this(), std::placeholders::_1,
- std::placeholders::_2));
+ auto self = shared_from_this();
+ consumer_->getConsumerCreatedFuture().addListener(
+ [this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
+ if (result == ResultOk) {
+ readerCreatedCallback_(result, Reader(self));
+ callback(weakConsumerPtr);
+ } else {
+ readerCreatedCallback_(result, {});
+ }
+ });
consumer_->start();
}
const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); }
-void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
- auto self = shared_from_this();
- readerCreatedCallback_(result, Reader(self));
-}
-
Result ReaderImpl::readNext(Message& msg) {
Result res = consumer_->receive(msg);
acknowledgeIfNecessary(res, msg);
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index 6de6c02e460..b0d8a6bc40a 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -42,7 +42,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
- void start(const MessageId& startMessageId);
+ void start(const MessageId& startMessageId, std::function<void(const ConsumerImplBaseWeakPtr&)> callback);
const std::string& getTopic() const;
@@ -65,8 +65,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
bool isConnected() const;
private:
- void handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer);
-
void messageListener(Consumer consumer, const Message& msg);
void acknowledgeIfNecessary(Result result, const Message& msg);
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 135c3f19b51..7c5ff8b307f 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -236,3 +236,35 @@ TEST(ClientTest, testReferenceCount) {
ASSERT_EQ(readerWeakPtr.use_count(), 0);
client.close();
}
+
+TEST(ClientTest, testWrongListener) {
+ const std::string topic = "client-test-wrong-listener-" + std::to_string(time(nullptr));
+ auto httpCode = makePutRequest(
+ "http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions", "3");
+ LOG_INFO("create " << topic << ": " << httpCode);
+
+ Client client(lookupUrl, ClientConfiguration().setListenerName("test"));
+ Producer producer;
+ ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer));
+ ASSERT_EQ(ResultProducerNotInitialized, producer.close());
+
+ Consumer consumer;
+ ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer));
+ ASSERT_EQ(ResultConsumerNotInitialized, consumer.close());
+
+ ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
+ ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
+ ASSERT_EQ(ResultOk, client.close());
+
+ // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the
+ // creation of Reader would fail with ResultConnectError.
+ client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
+
+ // Currently Reader can only read a non-partitioned topic in C++ client
+ Reader reader;
+ ASSERT_EQ(ResultServiceUnitNotReady,
+ client.createReader(topic + "-partition-0", MessageId::earliest(), {}, reader));
+ ASSERT_EQ(ResultConsumerNotInitialized, reader.close());
+ ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
+ ASSERT_EQ(ResultOk, client.close());
+}