You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/04/02 05:55:04 UTC
[pulsar] branch branch-2.9 updated: [C++] Fix flaky tests about reference count (#14854)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 7dc9a49 [C++] Fix flaky tests about reference count (#14854)
7dc9a49 is described below
commit 7dc9a4961a23d95409e01cd1e1e25ef501167ad0
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Apr 1 19:09:14 2022 +0800
[C++] Fix flaky tests about reference count (#14854)
Fixes #14848
Fixes #14719
### Motivation
#7793 introduced a `testReferenceLeak` to avoid cyclic referenece of the
reader. However, it adds a unused field `readerImplWeakPtr_` only for
tests. The access to this field is not thread safe that the write
operation happens in `handleConsumerCreated` while the read operation
can happen anywhere via the getter. So there is a little chance that
`readerPtr` in `testReferenceLeak` doesn't point to the right object.
In addition, we should only guarantee the reference count becomes 0
after the producer, consumer or reader goes out of its scope. #14797
adds a `ClientTest.testReferenceCount` but it's also flaky. It's caused
by the shared pointer of `ProducerImpl` is published to another thread
via `shared_from_this()` but the test has a strong expectation that the
reference count is exactly 1.
### Modifications
- Remove `readerImplWeakPtr_` from `ReaderImpl` and get the weak pointer
from `Reader` directly by adding a method to `PulsarFriend`.
- Add the check of reader's reference count to `testReferenceCount` and
remove the redundant `testReferenceLeak`.
- Instead of asserting the reference count of producer/consumer/reader
is 1, just assume the it's greater than 0.
(cherry picked from commit f84ff571df95f99efa4596e65324def1084fc11b)
---
pulsar-client-cpp/lib/ReaderImpl.cc | 5 ----
pulsar-client-cpp/lib/ReaderImpl.h | 5 +---
pulsar-client-cpp/tests/ClientTest.cc | 26 +++++++++++++++--
pulsar-client-cpp/tests/PulsarFriend.h | 7 +++++
pulsar-client-cpp/tests/ReaderTest.cc | 51 +++-------------------------------
pulsar-client-cpp/tests/ReaderTest.h | 32 ---------------------
6 files changed, 35 insertions(+), 91 deletions(-)
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 0a7b321..9401c12 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -90,11 +90,8 @@ const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic();
void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
auto self = shared_from_this();
readerCreatedCallback_(result, Reader(self));
- readerImplWeakPtr_ = self;
}
-ConsumerImplPtr ReaderImpl::getConsumer() { return consumer_; }
-
Result ReaderImpl::readNext(Message& msg) {
Result res = consumer_->receive(msg);
acknowledgeIfNecessary(res, msg);
@@ -144,8 +141,6 @@ void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
});
}
-ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }
-
bool ReaderImpl::isConnected() const { return consumer_->isConnected(); }
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index a546ae8..6de6c02 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -53,7 +53,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
Future<Result, ReaderImplWeakPtr> getReaderCreatedFuture();
- ConsumerImplPtr getConsumer();
+ ConsumerImplBaseWeakPtr getConsumer() const noexcept { return consumer_; }
void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
@@ -62,8 +62,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
void getLastMessageIdAsync(GetLastMessageIdCallback callback);
- ReaderImplWeakPtr getReaderImplWeakPtr();
-
bool isConnected() const;
private:
@@ -79,7 +77,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
ConsumerImplPtr consumer_;
ReaderCallback readerCreatedCallback_;
ReaderListener readerListener_;
- ReaderImplWeakPtr readerImplWeakPtr_;
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 920430d..1ba0164 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -24,6 +24,9 @@
#include <future>
#include <pulsar/Client.h>
#include "../lib/checksum/ChecksumProvider.h"
+#include "lib/LogUtils.h"
+
+DECLARE_LOG_OBJECT()
using namespace pulsar;
@@ -184,22 +187,39 @@ TEST(ClientTest, testReferenceCount) {
auto &producers = PulsarFriend::getProducers(client);
auto &consumers = PulsarFriend::getConsumers(client);
+ ReaderImplWeakPtr readerWeakPtr;
{
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
ASSERT_EQ(producers.size(), 1);
- ASSERT_EQ(producers[0].use_count(), 1);
+ ASSERT_TRUE(producers[0].use_count() > 0);
+ LOG_INFO("Reference count of the producer: " << producers[0].use_count());
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer));
ASSERT_EQ(consumers.size(), 1);
- ASSERT_EQ(consumers[0].use_count(), 1);
+ ASSERT_TRUE(consumers[0].use_count() > 0);
+ LOG_INFO("Reference count of the consumer: " << consumers[0].use_count());
+
+ ReaderConfiguration readerConf;
+ Reader reader;
+ ASSERT_EQ(ResultOk,
+ client.createReader(topic + "-reader", MessageId::earliest(), readerConf, reader));
+ ASSERT_EQ(consumers.size(), 2);
+ ASSERT_TRUE(consumers[1].use_count() > 0);
+ LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count());
+
+ readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader);
+ ASSERT_EQ(readerWeakPtr.use_count(), 1);
+ LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count());
}
ASSERT_EQ(producers.size(), 1);
ASSERT_EQ(producers[0].use_count(), 0);
- ASSERT_EQ(consumers.size(), 1);
+ ASSERT_EQ(consumers.size(), 2);
ASSERT_EQ(consumers[0].use_count(), 0);
+ ASSERT_EQ(consumers[1].use_count(), 0);
+ ASSERT_EQ(readerWeakPtr.use_count(), 0);
client.close();
}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index 74aa1f7..2d9b558 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -25,6 +25,7 @@
#include "lib/ConsumerImpl.h"
#include "lib/PartitionedConsumerImpl.h"
#include "lib/MultiTopicsConsumerImpl.h"
+#include "lib/ReaderImpl.h"
using std::string;
@@ -79,6 +80,12 @@ class PulsarFriend {
return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
}
+ static ConsumerImplPtr getConsumer(Reader reader) {
+ return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer().lock());
+ }
+
+ static ReaderImplWeakPtr getReaderImplWeakPtr(Reader reader) { return reader.impl_; }
+
static std::shared_ptr<PartitionedConsumerImpl> getPartitionedConsumerImplPtr(Consumer consumer) {
return std::static_pointer_cast<PartitionedConsumerImpl>(consumer.impl_);
}
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index 8cd535c..bf15692 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -18,8 +18,8 @@
*/
#include <pulsar/Client.h>
#include <pulsar/Reader.h>
-#include "ReaderTest.h"
#include "HttpHelper.h"
+#include "PulsarFriend.h"
#include <gtest/gtest.h>
@@ -28,6 +28,7 @@
#include <lib/Latch.h>
#include <lib/LogUtils.h>
+#include <lib/ReaderImpl.h>
DECLARE_LOG_OBJECT()
using namespace pulsar;
@@ -423,50 +424,6 @@ TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
client.close();
}
-TEST(ReaderTest, testReferenceLeak) {
- Client client(serviceUrl);
-
- std::string topicName = "persistent://public/default/testReferenceLeak";
-
- Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
-
- for (int i = 0; i < 10; i++) {
- std::string content = "my-message-" + std::to_string(i);
- Message msg = MessageBuilder().setContent(content).build();
- ASSERT_EQ(ResultOk, producer.send(msg));
- }
-
- ReaderConfiguration readerConf;
- Reader reader;
- ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
-
- ConsumerImplBaseWeakPtr consumerPtr = ReaderTest::getConsumer(reader);
- ReaderImplWeakPtr readerPtr = ReaderTest::getReaderImplWeakPtr(reader);
-
- LOG_INFO("1 consumer use count " << consumerPtr.use_count());
- LOG_INFO("1 reader use count " << readerPtr.use_count());
-
- for (int i = 0; i < 10; i++) {
- Message msg;
- ASSERT_EQ(ResultOk, reader.readNext(msg));
-
- std::string content = msg.getDataAsString();
- std::string expected = "my-message-" + std::to_string(i);
- ASSERT_EQ(expected, content);
- }
-
- producer.close();
- reader.close();
- // will be released after exit this method.
- ASSERT_EQ(1, consumerPtr.use_count());
- ASSERT_EQ(1, readerPtr.use_count());
- client.close();
- // will be released after exit this method.
- ASSERT_EQ(1, consumerPtr.use_count());
- ASSERT_EQ(1, readerPtr.use_count());
-}
-
TEST(ReaderTest, testPartitionIndex) {
Client client(serviceUrl);
@@ -519,7 +476,7 @@ TEST(ReaderTest, testSubscriptionNameSetting) {
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
- ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+ ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());
reader.close();
client.close();
@@ -537,7 +494,7 @@ TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
- ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+ ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());
reader.close();
client.close();
diff --git a/pulsar-client-cpp/tests/ReaderTest.h b/pulsar-client-cpp/tests/ReaderTest.h
deleted file mode 100644
index fd0387f..0000000
--- a/pulsar-client-cpp/tests/ReaderTest.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "lib/ReaderImpl.h"
-#include <string>
-
-using std::string;
-
-namespace pulsar {
-class ReaderTest {
- public:
- static ConsumerImplPtr getConsumer(const Reader& reader) { return reader.impl_->getConsumer(); }
- static ReaderImplWeakPtr getReaderImplWeakPtr(const Reader& reader) {
- return reader.impl_->getReaderImplWeakPtr();
- }
-};
-} // namespace pulsar