You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/12 01:57:58 UTC
[pulsar] 02/02: [C++] Fix potential crash caused by
AckGroupTracker's timer (#8519)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 98591c46de0294f6f3cd46ac359ae13a97ab862b
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Nov 12 09:45:32 2020 +0800
[C++] Fix potential crash caused by AckGroupTracker's timer (#8519)
### Motivation
The `AckGroupingTrackerEnabled`'s timer callback only captures `this`, which is a weak reference to the `AckGroupingTrackerEnabled ` instance. If the instance went out of the scope and destroyed, `this` would point to an invalid block.
Even if the destructor of `AckGroupingTrackerEnabled` cancels the timer, the callback may not be triggered immediately. There's still a possibility that when the callback is triggered, the error code is 0 but accessing to `this` is invalid. For example, there's a crash caused by the callback in production environment that is hard to reproduce:
```
#6 <signal handler called>
#7 0x00007fb4e67c5cb8 in ?? ()
#8 0x00007fb604981adb in operator() (ec=..., __closure=0x7fb52b0fb230)
at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc:148
#9 operator() (this=0x7fb52b0fb230) at /usr/local/include/boost/asio/detail/bind_handler.hpp:47
```
### Modifications
- Use `std::shared_ptr` instead of `std::unique_ptr` for `AckGroupingTrackerEnabled`, then capture the shared pointer in timer callback's lambda expression to extend the lifetime of `this`.
- Add `start()` method to `AckGroupingTracker` to avoid `std::bad_weak_ptr` because `shared_from_this()` in a constructor returns a null pointer.
- Use `std::weak_ptr` to reference `HandlerBase` in case that the handler may be invalid when the timer callback is triggered.
(cherry picked from commit cfa65d0838c44271d5cdcf4b4f5dcd5c5498a3e3)
---
pulsar-client-cpp/lib/AckGroupingTracker.h | 9 ++-
pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc | 21 ++++---
pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h | 7 ++-
pulsar-client-cpp/lib/ConsumerImpl.cc | 41 +++++++------
pulsar-client-cpp/lib/ConsumerImpl.h | 2 +-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 70 ++++++++++++----------
pulsar-client-cpp/tests/PulsarFriend.h | 4 ++
7 files changed, 90 insertions(+), 64 deletions(-)
diff --git a/pulsar-client-cpp/lib/AckGroupingTracker.h b/pulsar-client-cpp/lib/AckGroupingTracker.h
index a5b3d66..f4410e4 100644
--- a/pulsar-client-cpp/lib/AckGroupingTracker.h
+++ b/pulsar-client-cpp/lib/AckGroupingTracker.h
@@ -35,12 +35,17 @@ namespace pulsar {
* Default ACK grouping tracker, it actually neither tracks ACK requests nor sends them to brokers.
* It can be directly used by consumers for non-persistent topics.
*/
-class AckGroupingTracker {
+class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
public:
AckGroupingTracker() = default;
virtual ~AckGroupingTracker() = default;
/**
+ * Start tracking the ACK requests.
+ */
+ virtual void start() {}
+
+ /**
* Since ACK requests are grouped and delayed, we need to do some best-effort duplicate check to
* discard messages that are being resent after a disconnection and for which the user has
* already sent an acknowledgement.
@@ -102,7 +107,7 @@ class AckGroupingTracker {
const std::set<MessageId>& msgIds);
}; // class AckGroupingTracker
-using AckGroupingTrackerScopedPtr = std::unique_ptr<AckGroupingTracker>;
+using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;
} // namespace pulsar
#endif /* LIB_ACKGROUPINGTRACKER_H_ */
diff --git a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
index 5c61cca..a651c7a 100644
--- a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
@@ -33,11 +33,11 @@ namespace pulsar {
DECLARE_LOG_OBJECT();
-AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler,
- uint64_t consumerId, long ackGroupingTimeMs,
- long ackGroupingMaxSize)
+AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
+ const HandlerBasePtr& handlerPtr, uint64_t consumerId,
+ long ackGroupingTimeMs, long ackGroupingMaxSize)
: AckGroupingTracker(),
- handler_(handler),
+ handlerWeakPtr_(handlerPtr),
consumerId_(consumerId),
nextCumulativeAckMsgId_(MessageId::earliest()),
requireCumulativeAck_(false),
@@ -51,9 +51,10 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, Ha
mutexTimer_() {
LOG_DEBUG("ACK grouping is enabled, grouping time " << ackGroupingTimeMs << "ms, grouping max size "
<< ackGroupingMaxSize);
- this->scheduleTimer();
}
+void AckGroupingTrackerEnabled::start() { this->scheduleTimer(); }
+
bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) {
{
// Check if the message ID is already ACKed by a previous (or pending) cumulative request.
@@ -94,7 +95,12 @@ void AckGroupingTrackerEnabled::close() {
}
void AckGroupingTrackerEnabled::flush() {
- auto cnx = this->handler_.getCnx().lock();
+ auto handler = handlerWeakPtr_.lock();
+ if (!handler) {
+ LOG_WARN("Reference to the HandlerBase is not valid.");
+ return;
+ }
+ auto cnx = handler->getCnx().lock();
if (cnx == nullptr) {
LOG_DEBUG("Connection is not ready, grouping ACK failed.");
return;
@@ -143,7 +149,8 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
std::lock_guard<std::mutex> lock(this->mutexTimer_);
this->timer_ = this->executor_->createDeadlineTimer();
this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
- this->timer_->async_wait([this](const boost::system::error_code& ec) -> void {
+ auto self = shared_from_this();
+ this->timer_->async_wait([this, self](const boost::system::error_code& ec) -> void {
if (!ec) {
this->flush();
this->scheduleTimer();
diff --git a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
index 9fb871a..c3926aa 100644
--- a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
@@ -42,14 +42,15 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
/**
* Constructing ACK grouping tracker for peresistent topics.
* @param[in] clientPtr pointer to client object.
- * @param[in] handler the connection handler.
+ * @param[in] handlerPtr the shared pointer to connection handler.
* @param[in] consumerId consumer ID that this tracker belongs to.
* @param[in] ackGroupingTimeMs ACK grouping time window in milliseconds.
* @param[in] ackGroupingMaxSize max. number of ACK requests can be grouped.
*/
- AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler, uint64_t consumerId,
+ AckGroupingTrackerEnabled(ClientImplPtr clientPtr, const HandlerBasePtr& handlerPtr, uint64_t consumerId,
long ackGroupingTimeMs, long ackGroupingMaxSize);
+ void start() override;
bool isDuplicate(const MessageId& msgId) override;
void addAcknowledge(const MessageId& msgId) override;
void addAcknowledgeCumulative(const MessageId& msgId) override;
@@ -62,7 +63,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
void scheduleTimer();
//! The connection handler.
- HandlerBase& handler_;
+ HandlerBaseWeakPtr handlerWeakPtr_;
//! ID of the consumer that this tracker belongs to.
uint64_t consumerId_;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index dedd9b2..e85fbf1 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -62,6 +62,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
brokerConsumerStats_(),
consumerStatsBasePtr_(),
negativeAcksTracker_(client, *this, conf),
+ ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
msgCrypto_(),
readCompacted_(conf.isReadCompacted()),
lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
@@ -102,23 +103,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
if (conf.isEncryptionEnabled()) {
msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false);
}
-
- // Initialize ACK grouping tracker.
- if (TopicName::get(topic)->isPersistent()) {
- // Persistent topic, ACK requests need to be sent to broker.
- if (conf.getAckGroupingTimeMs() > 0) {
- // Grouping ACK is ENABLED because grouping time value is a positive value.
- this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
- client, *this, this->consumerId_, conf.getAckGroupingTimeMs(), conf.getAckGroupingMaxSize()));
- } else {
- // Grouping ACK is DISABLED because grouping time value is a non-positive value.
- this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, this->consumerId_));
- }
- } else {
- // Non-persistent topic, ACK requests do NOT need to be sent to broker.
- LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
- this->ackGroupingTrackerPtr_.reset(new AckGroupingTracker());
- }
}
ConsumerImpl::~ConsumerImpl() {
@@ -143,7 +127,24 @@ const std::string& ConsumerImpl::getSubscriptionName() const { return originalSu
const std::string& ConsumerImpl::getTopic() const { return topic_; }
-void ConsumerImpl::start() { grabCnx(); }
+void ConsumerImpl::start() {
+ HandlerBase::start();
+
+ // Initialize ackGroupingTrackerPtr_ here because the shared_from_this() was not initialized until the
+ // constructor completed.
+ if (TopicName::get(topic_)->isPersistent()) {
+ if (config_.getAckGroupingTimeMs() > 0) {
+ ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
+ client_.lock(), shared_from_this(), consumerId_, config_.getAckGroupingTimeMs(),
+ config_.getAckGroupingMaxSize()));
+ } else {
+ ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, consumerId_));
+ }
+ } else {
+ LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
+ }
+ ackGroupingTrackerPtr_->start();
+}
void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lock(mutex_);
@@ -871,7 +872,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
state_ = Closing;
// Flush pending grouped ACK requests.
- this->ackGroupingTrackerPtr_->close();
+ if (ackGroupingTrackerPtr_) {
+ ackGroupingTrackerPtr_->close();
+ }
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 9306a91..e122a4a 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -196,7 +196,7 @@ class ConsumerImpl : public ConsumerImplBase,
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
- AckGroupingTrackerScopedPtr ackGroupingTrackerPtr_;
+ AckGroupingTrackerPtr ackGroupingTrackerPtr_;
MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index acad16a..e3ae2f2 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3513,6 +3513,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerSingleAckBehavior) {
// Send ACK.
AckGroupingTrackerMock tracker(false);
+ tracker.start();
for (auto msgIdx = 0; msgIdx < numMsg; ++msgIdx) {
auto connPtr = connWeakPtr.lock();
ASSERT_NE(connPtr, nullptr);
@@ -3563,6 +3564,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerMultiAckBehavior) {
// Send ACK.
AckGroupingTrackerMock tracker(false);
+ tracker.start();
std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
ASSERT_EQ(restMsgId.size(), numMsg);
ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr, consumerImpl.getConsumerId(), restMsgId));
@@ -3669,9 +3671,10 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerDisabledCumulativeAck) {
class AckGroupingTrackerEnabledMock : public AckGroupingTrackerEnabled {
public:
- AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, HandlerBase &handler, uint64_t consumerId,
- long ackGroupingTimeMs, long ackGroupingMaxSize)
- : AckGroupingTrackerEnabled(clientPtr, handler, consumerId, ackGroupingTimeMs, ackGroupingMaxSize) {}
+ AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, const HandlerBasePtr &handlerPtr,
+ uint64_t consumerId, long ackGroupingTimeMs, long ackGroupingMaxSize)
+ : AckGroupingTrackerEnabled(clientPtr, handlerPtr, consumerId, ackGroupingTimeMs,
+ ackGroupingMaxSize) {}
const std::set<MessageId> &getPendingIndividualAcks() { return this->pendingIndividualAcks_; }
const long getAckGroupingTimeMs() { return this->ackGroupingTimeMs_; }
const long getAckGroupingMaxSize() { return this->ackGroupingMaxSize_; }
@@ -3698,7 +3701,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) {
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
- auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+ auto consumerImpl = PulsarFriend::getConsumerImplPtr(consumer);
// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
@@ -3713,22 +3716,23 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) {
recvMsgId.emplace_back(msg.getMessageId());
}
- AckGroupingTrackerEnabledMock tracker(clientImplPtr, consumerImpl, consumerImpl.getConsumerId(),
- ackGroupingTimeMs, ackGroupingMaxSize);
- ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0);
- ASSERT_EQ(tracker.getAckGroupingTimeMs(), ackGroupingTimeMs);
- ASSERT_EQ(tracker.getAckGroupingMaxSize(), ackGroupingMaxSize);
+ auto tracker = std::make_shared<AckGroupingTrackerEnabledMock>(
+ clientImplPtr, consumerImpl, consumerImpl->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
+ tracker->start();
+ ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
+ ASSERT_EQ(tracker->getAckGroupingTimeMs(), ackGroupingTimeMs);
+ ASSERT_EQ(tracker->getAckGroupingMaxSize(), ackGroupingMaxSize);
for (auto &msgId : recvMsgId) {
- ASSERT_FALSE(tracker.isDuplicate(msgId));
- tracker.addAcknowledge(msgId);
- ASSERT_TRUE(tracker.isDuplicate(msgId));
+ ASSERT_FALSE(tracker->isDuplicate(msgId));
+ tracker->addAcknowledge(msgId);
+ ASSERT_TRUE(tracker->isDuplicate(msgId));
}
- ASSERT_EQ(tracker.getPendingIndividualAcks().size(), recvMsgId.size());
+ ASSERT_EQ(tracker->getPendingIndividualAcks().size(), recvMsgId.size());
std::this_thread::sleep_for(std::chrono::seconds(2));
- ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0);
+ ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
for (auto &msgId : recvMsgId) {
- ASSERT_FALSE(tracker.isDuplicate(msgId));
+ ASSERT_FALSE(tracker->isDuplicate(msgId));
}
consumer.close();
@@ -3757,7 +3761,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
- auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);
+ auto consumerImpl0 = PulsarFriend::getConsumerImplPtr(consumer);
// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
@@ -3773,32 +3777,33 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
}
std::sort(recvMsgId.begin(), recvMsgId.end());
- AckGroupingTrackerEnabledMock tracker0(clientImplPtr, consumerImpl0, consumerImpl0.getConsumerId(),
- ackGroupingTimeMs, ackGroupingMaxSize);
- ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), MessageId::earliest());
- ASSERT_FALSE(tracker0.requireCumulativeAck());
+ auto tracker0 = std::make_shared<AckGroupingTrackerEnabledMock>(
+ clientImplPtr, consumerImpl0, consumerImpl0->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
+ tracker0->start();
+ ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), MessageId::earliest());
+ ASSERT_FALSE(tracker0->requireCumulativeAck());
auto targetMsgId = recvMsgId[numMsg / 2];
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
- ASSERT_FALSE(tracker0.isDuplicate(recvMsgId[idx]));
+ ASSERT_FALSE(tracker0->isDuplicate(recvMsgId[idx]));
}
- tracker0.addAcknowledgeCumulative(targetMsgId);
+ tracker0->addAcknowledgeCumulative(targetMsgId);
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
- ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx]));
+ ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
}
- ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), targetMsgId);
- ASSERT_TRUE(tracker0.requireCumulativeAck());
+ ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), targetMsgId);
+ ASSERT_TRUE(tracker0->requireCumulativeAck());
std::this_thread::sleep_for(std::chrono::seconds(2));
- ASSERT_FALSE(tracker0.requireCumulativeAck());
+ ASSERT_FALSE(tracker0->requireCumulativeAck());
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
- ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx]));
+ ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
}
consumer.close();
std::this_thread::sleep_for(std::chrono::seconds(1));
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
- auto &consumerImpl1 = PulsarFriend::getConsumerImpl(consumer);
+ auto consumerImpl1 = PulsarFriend::getConsumerImplPtr(consumer);
std::set<MessageId> restMsgId(recvMsgId.begin() + numMsg / 2 + 1, recvMsgId.end());
for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
Message msg;
@@ -3808,10 +3813,11 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << msg.getDataAsString();
- AckGroupingTrackerEnabledMock tracker1(clientImplPtr, consumerImpl1, consumerImpl1.getConsumerId(),
- ackGroupingTimeMs, ackGroupingMaxSize);
- tracker1.addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
- tracker1.close();
+ auto tracker1 = std::make_shared<AckGroupingTrackerEnabledMock>(
+ clientImplPtr, consumerImpl1, consumerImpl1->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
+ tracker1->start();
+ tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
+ tracker1->close();
consumer.close();
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index 26a1fe8..87b48d2 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -82,6 +82,10 @@ class PulsarFriend {
return *consumerImpl;
}
+ static std::shared_ptr<ConsumerImpl> getConsumerImplPtr(Consumer consumer) {
+ return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
+ }
+
static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { return client.impl_; }
static void setNegativeAckEnabled(Consumer consumer, bool enabled) {