You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/13 08:35:20 UTC
[pulsar] branch branch-2.10 updated: [fix][cpp] Fix potential segfault when resending messages (#17395)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 25b691ba62e [fix][cpp] Fix potential segfault when resending messages (#17395)
25b691ba62e is described below
commit 25b691ba62e4166818cf63ef51c6ff808f0aa77e
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Sep 6 10:39:35 2022 +0800
[fix][cpp] Fix potential segfault when resending messages (#17395)
Fixes #17392
### Motivation
All timers in `ProducerImpl` are `std::shared_ptr` objects that can be
reset with `nullptr` in `ProducerImpl::cancelTimers`. It could lead to
null pointer access in some cases.
See
https://github.com/apache/pulsar/issues/17392#issuecomment-1233929427
for the analysis.
Generally it's not necessary to hold a nullable pointer to the timer.
However, to resolve the cyclic reference issue, #5246 reset the shared
pointer to reduce the reference count manually. It's not a good solution
because we have to perform null check for timers everywhere. The null
check still has some race condition issue like:
Thread 1:
```c++
if (timer) { // [1] timer is not nullptr
timer->async_wait(/* ... */); // [3] timer is null now, see [2] below
}
```
Thread 2:
```c++
timer.reset(); // [2]
```
The best solution is to capture `weak_ptr` in timer's callback and call
`lock()` to check if the referenced object is still valid.
### Modifications
- Change the type of `sendTimer_` and `batchTimer_` to `deadline_timer`,
not a `shared_ptr`.
- Use `PeriodicTask` instead of the `deadline_timer` for token refresh.
- Migrate `weak_from_this()` method from C++17 and capture
`weak_from_this()` instead of `shared_from_this()` in callbacks.
### Verifying this change
Run the `testResendViaSendCallback` for many times and we can see it
won't fail after this patch.
```bash
./tests/main --gtest_filter='BasicEndToEndTest.testResendViaSendCallback' --gtest_repeat=30
```
(cherry picked from commit 7d6f394b3c8ed693dc4ce1e138e9928930118fdb)
---
pulsar-client-cpp/lib/PeriodicTask.cc | 9 ++-
pulsar-client-cpp/lib/ProducerImpl.cc | 111 +++++++++++++++-------------------
pulsar-client-cpp/lib/ProducerImpl.h | 13 ++--
3 files changed, 62 insertions(+), 71 deletions(-)
diff --git a/pulsar-client-cpp/lib/PeriodicTask.cc b/pulsar-client-cpp/lib/PeriodicTask.cc
index 533d38b5efa..4e91ef5f7e1 100644
--- a/pulsar-client-cpp/lib/PeriodicTask.cc
+++ b/pulsar-client-cpp/lib/PeriodicTask.cc
@@ -27,9 +27,14 @@ void PeriodicTask::start() {
}
state_ = Ready;
if (periodMs_ >= 0) {
- auto self = shared_from_this();
+ std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
timer_.expires_from_now(boost::posix_time::millisec(periodMs_));
- timer_.async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); });
+ timer_.async_wait([weakSelf](const ErrorCode& ec) {
+ auto self = weakSelf.lock();
+ if (self) {
+ self->handleTimeout(ec);
+ }
+ });
}
}
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index a9868bca97a..383fe277c2b 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -56,7 +56,9 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
- dataKeyGenIntervalSec_(4 * 60 * 60),
+ batchTimer_(executor_->getIOService()),
+ sendTimer_(executor_->getIOService()),
+ dataKeyRefreshTask_(executor_->getIOService(), 4 * 60 * 60 * 1000),
memoryLimitController_(client->getMemoryLimitController()),
chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()) {
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
@@ -102,7 +104,6 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
LOG_ERROR("Unknown batching type: " << conf_.getBatchingType());
return;
}
- batchTimer_ = executor_->createDeadlineTimer();
}
}
@@ -123,19 +124,6 @@ int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished
const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; }
-void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
- if (ec) {
- LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
- return;
- }
-
- msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
-
- dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
- dataKeyGenTImer_->async_wait(
- std::bind(&pulsar::ProducerImpl::refreshEncryptionKey, shared_from_this(), std::placeholders::_1));
-}
-
void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
if (state_ == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
@@ -200,11 +188,19 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
backoff_.reset();
lock.unlock();
- if (!dataKeyGenTImer_ && conf_.isEncryptionEnabled()) {
- dataKeyGenTImer_ = executor_->createDeadlineTimer();
- dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
- dataKeyGenTImer_->async_wait(std::bind(&pulsar::ProducerImpl::refreshEncryptionKey,
- shared_from_this(), std::placeholders::_1));
+ if (conf_.isEncryptionEnabled()) {
+ auto weakSelf = weak_from_this();
+ dataKeyRefreshTask_.setCallback([this, weakSelf](const PeriodicTask::ErrorCode& ec) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ if (ec) {
+ LOG_ERROR("DataKeyRefresh timer failed: " << ec.message());
+ return;
+ }
+ msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
+ });
}
// if the producer is lazy the send timeout timer is already running
@@ -466,10 +462,29 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
bool isFull = batchMessageContainer_->add(msg, callback);
if (isFirstMessage) {
- batchTimer_->expires_from_now(
+ batchTimer_.expires_from_now(
boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
- batchTimer_->async_wait(std::bind(&ProducerImpl::batchMessageTimeoutHandler, shared_from_this(),
- std::placeholders::_1));
+ auto weakSelf = weak_from_this();
+ batchTimer_.async_wait([this, weakSelf](const boost::system::error_code& ec) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ if (ec) {
+ LOG_DEBUG(getName() << " Ignoring timer cancelled event, code[" << ec << "]");
+ return;
+ }
+ LOG_DEBUG(getName() << " - Batch Message Timer expired");
+
+ // ignore if the producer is already closing/closed
+ const auto state = state_.load();
+ if (state == Pending || state == Ready) {
+ Lock lock(mutex_);
+ auto failures = batchMessageAndSend();
+ lock.unlock();
+ failures.complete();
+ }
+ });
}
if (isFull) {
@@ -560,7 +575,7 @@ void ProducerImpl::releaseSemaphoreForSendOp(const OpSendMsg& op) {
PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCallback) {
PendingFailures failures;
LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
- batchTimer_->cancel();
+ batchTimer_.cancel();
batchMessageContainer_->processAndClear(
[this, &failures](Result result, const OpSendMsg& opSendMsg) {
@@ -597,23 +612,6 @@ void ProducerImpl::sendMessage(const OpSendMsg& op) {
}
}
-void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& ec) {
- if (ec) {
- LOG_DEBUG(getName() << " Ignoring timer cancelled event, code[" << ec << "]");
- return;
- }
- LOG_DEBUG(getName() << " - Batch Message Timer expired");
-
- // ignore if the producer is already closing/closed
- const auto state = state_.load();
- if (state == Pending || state == Ready) {
- Lock lock(mutex_);
- auto failures = batchMessageAndSend();
- lock.unlock();
- failures.complete();
- }
-}
-
void ProducerImpl::printStats() {
if (batchMessageContainer_) {
LOG_INFO("Producer - " << producerStr_ << ", [batchMessageContainer = " << *batchMessageContainer_
@@ -866,20 +864,9 @@ void ProducerImpl::shutdown() {
}
void ProducerImpl::cancelTimers() {
- if (dataKeyGenTImer_) {
- dataKeyGenTImer_->cancel();
- dataKeyGenTImer_.reset();
- }
-
- if (batchTimer_) {
- batchTimer_->cancel();
- batchTimer_.reset();
- }
-
- if (sendTimer_) {
- sendTimer_->cancel();
- sendTimer_.reset();
- }
+ dataKeyRefreshTask_.stop();
+ batchTimer_.cancel();
+ sendTimer_.cancel();
}
bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const {
@@ -894,20 +881,16 @@ uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1
bool ProducerImpl::isStarted() const { return state_ != NotStarted; }
void ProducerImpl::startSendTimeoutTimer() {
- // Initialize the sendTimer only once per producer and only when producer timeout is
- // configured. Set the timeout as configured value and asynchronously wait for the
- // timeout to happen.
- if (!sendTimer_ && conf_.getSendTimeout() > 0) {
- sendTimer_ = executor_->createDeadlineTimer();
+ if (conf_.getSendTimeout() > 0) {
asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
}
}
void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
- sendTimer_->expires_from_now(expiryTime);
+ sendTimer_.expires_from_now(expiryTime);
- ProducerImplBaseWeakPtr weakSelf = shared_from_this();
- sendTimer_->async_wait([weakSelf](const boost::system::error_code& err) {
+ auto weakSelf = weak_from_this();
+ sendTimer_.async_wait([weakSelf](const boost::system::error_code& err) {
auto self = weakSelf.lock();
if (self) {
std::static_pointer_cast<ProducerImpl>(self)->handleSendTimeout(err);
@@ -915,5 +898,7 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
});
}
+ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return shared_from_this(); }
+
} // namespace pulsar
/* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index a8f35a619c9..43b33933934 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -35,6 +35,7 @@
#include "BatchMessageContainerBase.h"
#include "PendingFailures.h"
#include "Semaphore.h"
+#include "PeriodicTask.h"
using namespace pulsar;
@@ -86,6 +87,9 @@ class ProducerImpl : public HandlerBase,
static int getNumOfChunks(uint32_t size, uint32_t maxMessageSize);
+ // NOTE: this method is introduced into `enable_shared_from_this` since C++17
+ ProducerImplWeakPtr weak_from_this() noexcept;
+
protected:
ProducerStatsBasePtr producerStatsBasePtr_;
@@ -95,8 +99,6 @@ class ProducerImpl : public HandlerBase,
void sendMessage(const OpSendMsg& opSendMsg);
- void batchMessageTimeoutHandler(const boost::system::error_code& ec);
-
void startSendTimeoutTimer();
friend class PulsarFriend;
@@ -162,13 +164,13 @@ class ProducerImpl : public HandlerBase,
proto::BaseCommand cmd_;
std::unique_ptr<BatchMessageContainerBase> batchMessageContainer_;
- DeadlineTimerPtr batchTimer_;
+ boost::asio::deadline_timer batchTimer_;
PendingFailures batchMessageAndSend(const FlushCallback& flushCallback = nullptr);
volatile int64_t lastSequenceIdPublished_;
std::string schemaVersion_;
- DeadlineTimerPtr sendTimer_;
+ boost::asio::deadline_timer sendTimer_;
void handleSendTimeout(const boost::system::error_code& err);
using DurationType = typename boost::asio::deadline_timer::duration_type;
void asyncWaitSendTimeout(DurationType expiryTime);
@@ -182,8 +184,7 @@ class ProducerImpl : public HandlerBase,
void failPendingMessages(Result result, bool withLock);
MessageCryptoPtr msgCrypto_;
- DeadlineTimerPtr dataKeyGenTImer_;
- uint32_t dataKeyGenIntervalSec_;
+ PeriodicTask dataKeyRefreshTask_;
MemoryLimitController& memoryLimitController_;
const bool chunkingEnabled_;