You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 11:48:57 UTC
[pulsar] 02/12: [C++] Fix UnknownError might be returned for a partitioned producer (#15161)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 95b5ef8be37da96c99711bddad2d7996dd80007d
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Apr 14 14:59:46 2022 +0800
[C++] Fix UnknownError might be returned for a partitioned producer (#15161)
Fixes #15078
When a partitioned producer is created and some of the partitioned
failed to create, `closeAsync` will be called immediately, even if other
partitions were still in progress of creating the associated single
producers.
Since `closeAsync` is called before calling `setFailed` on the
`partitionedProducerCreatedPromise_` field, there is a race condition
that all single producers are closed before the promise is set. Then the
promise will be set with `ResultUnknownError`, see
https://github.com/apache/pulsar/blob/4aeeed5dab9dfe9493526f36d539b3ef29cf6fe5/pulsar-client-cpp/lib/PartitionedProducerImpl.cc#L317.
Only after all single producers failed or succeeded then call
`closeAsync` if one of them failed. And ensure
`partitionedProducerCreatedPromise_` was completed before calling
`closeAsync`.
This PR also makes the state of a partitioned producer atomic because
using a mutex to protect it makes code hard to write.
Create a separate namespace `public/test-backlog-quotas` to test the
case when the backlog quota exceeds. Then add
`testBacklogQuotasExceeded` test to make some backlog via creating a
consumer and sending some messages to a partition of the topic.
In this test, only 1 partition has backlog and will fail with the
related error. So the test verifies that `createProducer` could return a
correct error instead of `ResultUnknownError`.
(cherry picked from commit 0f8559611914a17efe222bf6ed1dd621d5a9cf45)
---
pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 55 +++++++++---------------
pulsar-client-cpp/lib/PartitionedProducerImpl.h | 12 ++----
pulsar-client-cpp/pulsar-test-service-start.sh | 4 ++
pulsar-client-cpp/tests/ProducerTest.cc | 43 ++++++++++++++++++
4 files changed, 71 insertions(+), 43 deletions(-)
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index bdd23ed6c91..9f197f0d45f 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -136,28 +136,29 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
unsigned int partitionIndex) {
// to indicate, we are doing cleanup using closeAsync after producer create
// has failed and the invocation of closeAsync is not from client
- CloseCallback closeCallback = NULL;
- Lock lock(mutex_);
+ const auto numPartitions = getNumPartitionsWithLock();
+ assert(numProducersCreated_ <= numPartitions && partitionIndex <= numPartitions);
+
if (state_ == Failed) {
- // Ignore, we have already informed client that producer creation failed
+ // We have already informed client that producer creation failed
+ if (++numProducersCreated_ == numPartitions) {
+ closeAsync(nullptr);
+ }
return;
}
- const auto numPartitions = getNumPartitionsWithLock();
- assert(numProducersCreated_ <= numPartitions);
+
if (result != ResultOk) {
- state_ = Failed;
- lock.unlock();
- closeAsync(closeCallback);
- partitionedProducerCreatedPromise_.setFailed(result);
LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
+ partitionedProducerCreatedPromise_.setFailed(result);
+ state_ = Failed;
+ if (++numProducersCreated_ == numPartitions) {
+ closeAsync(nullptr);
+ }
return;
}
- assert(partitionIndex <= numPartitions);
- numProducersCreated_++;
- if (numProducersCreated_ == numPartitions) {
+ if (++numProducersCreated_ == numPartitions) {
state_ = Ready;
- lock.unlock();
if (partitionsUpdateTimer_) {
runPartitionUpdateTask();
}
@@ -181,7 +182,7 @@ void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partition
// override
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
- if (!assertState(Ready)) {
+ if (state_ != Ready) {
callback(ResultAlreadyClosed, msg.getMessageId());
return;
}
@@ -211,18 +212,7 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
}
// override
-void PartitionedProducerImpl::shutdown() { setState(Closed); }
-
-void PartitionedProducerImpl::setState(const PartitionedProducerState state) {
- Lock lock(mutex_);
- state_ = state;
- lock.unlock();
-}
-
-bool PartitionedProducerImpl::assertState(const PartitionedProducerState state) {
- Lock lock(mutex_);
- return state_ == state;
-}
+void PartitionedProducerImpl::shutdown() { state_ = Closed; }
const std::string& PartitionedProducerImpl::getProducerName() const {
Lock producersLock(producersMutex_);
@@ -251,7 +241,10 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
* create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure
*/
void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
- setState(Closing);
+ if (state_ == Closing || state_ == Closed) {
+ return;
+ }
+ state_ = Closing;
unsigned int producerAlreadyClosed = 0;
@@ -280,7 +273,7 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
* handleSinglePartitionProducerCreated
*/
if (producerAlreadyClosed == numProducers && closeCallback) {
- setState(Closed);
+ state_ = Closed;
closeCallback(ResultOk);
}
}
@@ -288,14 +281,12 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
const unsigned int partitionIndex,
CloseCallback callback) {
- Lock lock(mutex_);
if (state_ == Failed) {
// we should have already notified the client by callback
return;
}
if (result != ResultOk) {
state_ = Failed;
- lock.unlock();
LOG_ERROR("Closing the producer failed for partition - " << partitionIndex);
if (callback) {
callback(result);
@@ -309,7 +300,6 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
// closed all successfully
if (!numProducersCreated_) {
state_ = Closed;
- lock.unlock();
// set the producerCreatedPromise to failure, if client called
// closeAsync and it's not failure to create producer, the promise
// is set second time here, first time it was successful. So check
@@ -395,7 +385,6 @@ void PartitionedProducerImpl::getPartitionMetadata() {
void PartitionedProducerImpl::handleGetPartitions(Result result,
const LookupDataResultPtr& lookupDataResult) {
- Lock stateLock(mutex_);
if (state_ != Ready) {
return;
}
@@ -428,11 +417,9 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
}
bool PartitionedProducerImpl::isConnected() const {
- Lock stateLock(mutex_);
if (state_ != Ready) {
return false;
}
- stateLock.unlock();
Lock producersLock(producersMutex_);
const auto producers = producers_;
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index 874d6cda526..0a8c10e2213 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -30,7 +30,7 @@ namespace pulsar {
class PartitionedProducerImpl : public ProducerImplBase,
public std::enable_shared_from_this<PartitionedProducerImpl> {
public:
- enum PartitionedProducerState
+ enum State
{
Pending,
Ready,
@@ -73,8 +73,6 @@ class PartitionedProducerImpl : public ProducerImplBase,
void notifyResult(CloseCallback closeCallback);
- void setState(PartitionedProducerState state);
-
friend class PulsarFriend;
private:
@@ -83,7 +81,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
const TopicNamePtr topicName_;
const std::string topic_;
- unsigned int numProducersCreated_ = 0;
+ std::atomic_uint numProducersCreated_{0};
/*
* set when one or more Single Partition Creation fails, close will cleanup and fail the create callbackxo
@@ -99,10 +97,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
mutable std::mutex producersMutex_;
MessageRoutingPolicyPtr routerPolicy_;
- // mutex_ is used to share state_, and numProducersCreated_
- mutable std::mutex mutex_;
-
- PartitionedProducerState state_ = Pending;
+ std::atomic<State> state_{Pending};
// only set this promise to value, when producers on all partitions are created.
Promise<Result, ProducerImplBaseWeakPtr> partitionedProducerCreatedPromise_;
@@ -124,7 +119,6 @@ class PartitionedProducerImpl : public ProducerImplBase,
void runPartitionUpdateTask();
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
- bool assertState(const PartitionedProducerState state);
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh b/pulsar-client-cpp/pulsar-test-service-start.sh
index 248d628b9c2..2bee18e64b9 100755
--- a/pulsar-client-cpp/pulsar-test-service-start.sh
+++ b/pulsar-client-cpp/pulsar-test-service-start.sh
@@ -106,6 +106,10 @@ $PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-4 \
--role "anonymous"
$PULSAR_DIR/bin/pulsar-admin namespaces set-encryption-required public/default-4 -e
+# Create "public/test-backlog-quotas" to test backlog quotas policy
+$PULSAR_DIR/bin/pulsar-admin namespaces create public/test-backlog-quotas \
+ --clusters standalone
+
# Create "private" tenant
$PULSAR_DIR/bin/pulsar-admin tenants create private -r "" -c "standalone"
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 210f01345d4..14461429da3 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -240,4 +240,47 @@ TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) {
client.close();
LOG_INFO("End of run " << run);
}
+}
+
+TEST(ProducerTest, testBacklogQuotasExceeded) {
+ std::string ns = "public/test-backlog-quotas";
+ std::string topic = ns + "/testBacklogQuotasExceeded" + std::to_string(time(nullptr));
+
+ int res = makePutRequest(adminUrl + "admin/v2/persistent/" + topic + "/partitions", "5");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+ LOG_INFO("Created topic " << topic << " with 5 partitions");
+
+ auto setBacklogPolicy = [&ns](const std::string& policy, int limitSize) {
+ const auto body =
+ R"({"policy":")" + policy + R"(","limitSize":)" + std::to_string(limitSize) + "}";
+ int res = makePostRequest(adminUrl + "admin/v2/namespaces/" + ns + "/backlogQuota", body);
+ LOG_INFO(res << " | Change the backlog policy to: " << body);
+ ASSERT_TRUE(res == 204 || res == 409);
+ };
+
+ Client client(serviceUrl);
+
+ // Create a topic with backlog size that is greater than 1024
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); // create a cursor
+ Producer producer;
+
+ const auto partition = topic + "-partition-0";
+ ASSERT_EQ(ResultOk, client.createProducer(partition, producer));
+ ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(std::string(1024L, 'a')).build()));
+ ASSERT_EQ(ResultOk, producer.close());
+
+ setBacklogPolicy("producer_request_hold", 1024);
+ ASSERT_EQ(ResultProducerBlockedQuotaExceededError, client.createProducer(topic, producer));
+ ASSERT_EQ(ResultProducerBlockedQuotaExceededError, client.createProducer(partition, producer));
+
+ setBacklogPolicy("producer_exception", 1024);
+ ASSERT_EQ(ResultProducerBlockedQuotaExceededException, client.createProducer(topic, producer));
+ ASSERT_EQ(ResultProducerBlockedQuotaExceededException, client.createProducer(partition, producer));
+
+ setBacklogPolicy("consumer_backlog_eviction", 1024);
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ ASSERT_EQ(ResultOk, client.createProducer(partition, producer));
+
+ client.close();
}
\ No newline at end of file