You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/13 08:35:20 UTC

[pulsar] branch branch-2.10 updated: [fix][cpp] Fix potential segfault when resending messages (#17395)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 25b691ba62e [fix][cpp] Fix potential segfault when resending messages (#17395)
25b691ba62e is described below

commit 25b691ba62e4166818cf63ef51c6ff808f0aa77e
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Sep 6 10:39:35 2022 +0800

    [fix][cpp] Fix potential segfault when resending messages (#17395)
    
    Fixes #17392
    
    ### Motivation
    
    All timers in `ProducerImpl` are `std::shared_ptr` objects that can be
    reset with `nullptr` in `ProducerImpl::cancelTimers`. It could lead to
    null pointer access in some cases.
    
    See
    https://github.com/apache/pulsar/issues/17392#issuecomment-1233929427
    for the analysis.
    
    Generally it's not necessary to hold a nullable pointer to the timer.
    However, to resolve the cyclic reference issue, #5246 reset the shared
    pointer to reduce the reference count manually. It's not a good solution
    because we have to perform null check for timers everywhere. The null
    check still has some race condition issue like:
    
    Thread 1:
    
    ```c++
    if (timer) {  // [1] timer is not nullptr
        timer->async_wait(/* ... */);  // [3] timer is null now, see [2] below
    }
    ```
    
    Thread 2:
    
    ```c++
    timer.reset();  // [2]
    ```
    
    The best solution is to capture `weak_ptr` in timer's callback and call
    `lock()` to check if the referenced object is still valid.
    
    ### Modifications
    - Change the type of `sendTimer_` and `batchTimer_` to `deadline_timer`,
      not a `shared_ptr`.
    - Use `PeriodicTask` instead of the `deadline_timer` for token refresh.
    - Migrate `weak_from_this()` method from C++17 and capture
      `weak_from_this()` instead of `shared_from_this()` in callbacks.
    
    ### Verifying this change
    
    Run the `testResendViaSendCallback` for many times and we can see it
    won't fail after this patch.
    
    ```bash
    ./tests/main --gtest_filter='BasicEndToEndTest.testResendViaSendCallback' --gtest_repeat=30
    ```
    
    (cherry picked from commit 7d6f394b3c8ed693dc4ce1e138e9928930118fdb)
---
 pulsar-client-cpp/lib/PeriodicTask.cc |   9 ++-
 pulsar-client-cpp/lib/ProducerImpl.cc | 111 +++++++++++++++-------------------
 pulsar-client-cpp/lib/ProducerImpl.h  |  13 ++--
 3 files changed, 62 insertions(+), 71 deletions(-)

diff --git a/pulsar-client-cpp/lib/PeriodicTask.cc b/pulsar-client-cpp/lib/PeriodicTask.cc
index 533d38b5efa..4e91ef5f7e1 100644
--- a/pulsar-client-cpp/lib/PeriodicTask.cc
+++ b/pulsar-client-cpp/lib/PeriodicTask.cc
@@ -27,9 +27,14 @@ void PeriodicTask::start() {
     }
     state_ = Ready;
     if (periodMs_ >= 0) {
-        auto self = shared_from_this();
+        std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
         timer_.expires_from_now(boost::posix_time::millisec(periodMs_));
-        timer_.async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); });
+        timer_.async_wait([weakSelf](const ErrorCode& ec) {
+            auto self = weakSelf.lock();
+            if (self) {
+                self->handleTimeout(ec);
+            }
+        });
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index a9868bca97a..383fe277c2b 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -56,7 +56,9 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
       producerStr_("[" + topic_ + ", " + producerName_ + "] "),
       producerId_(client->newProducerId()),
       msgSequenceGenerator_(0),
-      dataKeyGenIntervalSec_(4 * 60 * 60),
+      batchTimer_(executor_->getIOService()),
+      sendTimer_(executor_->getIOService()),
+      dataKeyRefreshTask_(executor_->getIOService(), 4 * 60 * 60 * 1000),
       memoryLimitController_(client->getMemoryLimitController()),
       chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()) {
     LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
@@ -102,7 +104,6 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
                 LOG_ERROR("Unknown batching type: " << conf_.getBatchingType());
                 return;
         }
-        batchTimer_ = executor_->createDeadlineTimer();
     }
 }
 
@@ -123,19 +124,6 @@ int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished
 
 const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; }
 
-void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
-    if (ec) {
-        LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
-        return;
-    }
-
-    msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
-
-    dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
-    dataKeyGenTImer_->async_wait(
-        std::bind(&pulsar::ProducerImpl::refreshEncryptionKey, shared_from_this(), std::placeholders::_1));
-}
-
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
@@ -200,11 +188,19 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         backoff_.reset();
         lock.unlock();
 
-        if (!dataKeyGenTImer_ && conf_.isEncryptionEnabled()) {
-            dataKeyGenTImer_ = executor_->createDeadlineTimer();
-            dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
-            dataKeyGenTImer_->async_wait(std::bind(&pulsar::ProducerImpl::refreshEncryptionKey,
-                                                   shared_from_this(), std::placeholders::_1));
+        if (conf_.isEncryptionEnabled()) {
+            auto weakSelf = weak_from_this();
+            dataKeyRefreshTask_.setCallback([this, weakSelf](const PeriodicTask::ErrorCode& ec) {
+                auto self = weakSelf.lock();
+                if (!self) {
+                    return;
+                }
+                if (ec) {
+                    LOG_ERROR("DataKeyRefresh timer failed: " << ec.message());
+                    return;
+                }
+                msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
+            });
         }
 
         // if the producer is lazy the send timeout timer is already running
@@ -466,10 +462,29 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
         bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
         bool isFull = batchMessageContainer_->add(msg, callback);
         if (isFirstMessage) {
-            batchTimer_->expires_from_now(
+            batchTimer_.expires_from_now(
                 boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
-            batchTimer_->async_wait(std::bind(&ProducerImpl::batchMessageTimeoutHandler, shared_from_this(),
-                                              std::placeholders::_1));
+            auto weakSelf = weak_from_this();
+            batchTimer_.async_wait([this, weakSelf](const boost::system::error_code& ec) {
+                auto self = weakSelf.lock();
+                if (!self) {
+                    return;
+                }
+                if (ec) {
+                    LOG_DEBUG(getName() << " Ignoring timer cancelled event, code[" << ec << "]");
+                    return;
+                }
+                LOG_DEBUG(getName() << " - Batch Message Timer expired");
+
+                // ignore if the producer is already closing/closed
+                const auto state = state_.load();
+                if (state == Pending || state == Ready) {
+                    Lock lock(mutex_);
+                    auto failures = batchMessageAndSend();
+                    lock.unlock();
+                    failures.complete();
+                }
+            });
         }
 
         if (isFull) {
@@ -560,7 +575,7 @@ void ProducerImpl::releaseSemaphoreForSendOp(const OpSendMsg& op) {
 PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCallback) {
     PendingFailures failures;
     LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
-    batchTimer_->cancel();
+    batchTimer_.cancel();
 
     batchMessageContainer_->processAndClear(
         [this, &failures](Result result, const OpSendMsg& opSendMsg) {
@@ -597,23 +612,6 @@ void ProducerImpl::sendMessage(const OpSendMsg& op) {
     }
 }
 
-void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& ec) {
-    if (ec) {
-        LOG_DEBUG(getName() << " Ignoring timer cancelled event, code[" << ec << "]");
-        return;
-    }
-    LOG_DEBUG(getName() << " - Batch Message Timer expired");
-
-    // ignore if the producer is already closing/closed
-    const auto state = state_.load();
-    if (state == Pending || state == Ready) {
-        Lock lock(mutex_);
-        auto failures = batchMessageAndSend();
-        lock.unlock();
-        failures.complete();
-    }
-}
-
 void ProducerImpl::printStats() {
     if (batchMessageContainer_) {
         LOG_INFO("Producer - " << producerStr_ << ", [batchMessageContainer = " << *batchMessageContainer_
@@ -866,20 +864,9 @@ void ProducerImpl::shutdown() {
 }
 
 void ProducerImpl::cancelTimers() {
-    if (dataKeyGenTImer_) {
-        dataKeyGenTImer_->cancel();
-        dataKeyGenTImer_.reset();
-    }
-
-    if (batchTimer_) {
-        batchTimer_->cancel();
-        batchTimer_.reset();
-    }
-
-    if (sendTimer_) {
-        sendTimer_->cancel();
-        sendTimer_.reset();
-    }
+    dataKeyRefreshTask_.stop();
+    batchTimer_.cancel();
+    sendTimer_.cancel();
 }
 
 bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const {
@@ -894,20 +881,16 @@ uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1
 
 bool ProducerImpl::isStarted() const { return state_ != NotStarted; }
 void ProducerImpl::startSendTimeoutTimer() {
-    // Initialize the sendTimer only once per producer and only when producer timeout is
-    // configured. Set the timeout as configured value and asynchronously wait for the
-    // timeout to happen.
-    if (!sendTimer_ && conf_.getSendTimeout() > 0) {
-        sendTimer_ = executor_->createDeadlineTimer();
+    if (conf_.getSendTimeout() > 0) {
         asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
     }
 }
 
 void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
-    sendTimer_->expires_from_now(expiryTime);
+    sendTimer_.expires_from_now(expiryTime);
 
-    ProducerImplBaseWeakPtr weakSelf = shared_from_this();
-    sendTimer_->async_wait([weakSelf](const boost::system::error_code& err) {
+    auto weakSelf = weak_from_this();
+    sendTimer_.async_wait([weakSelf](const boost::system::error_code& err) {
         auto self = weakSelf.lock();
         if (self) {
             std::static_pointer_cast<ProducerImpl>(self)->handleSendTimeout(err);
@@ -915,5 +898,7 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
     });
 }
 
+ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return shared_from_this(); }
+
 }  // namespace pulsar
 /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index a8f35a619c9..43b33933934 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -35,6 +35,7 @@
 #include "BatchMessageContainerBase.h"
 #include "PendingFailures.h"
 #include "Semaphore.h"
+#include "PeriodicTask.h"
 
 using namespace pulsar;
 
@@ -86,6 +87,9 @@ class ProducerImpl : public HandlerBase,
 
     static int getNumOfChunks(uint32_t size, uint32_t maxMessageSize);
 
+    // NOTE: this method is introduced into `enable_shared_from_this` since C++17
+    ProducerImplWeakPtr weak_from_this() noexcept;
+
    protected:
     ProducerStatsBasePtr producerStatsBasePtr_;
 
@@ -95,8 +99,6 @@ class ProducerImpl : public HandlerBase,
 
     void sendMessage(const OpSendMsg& opSendMsg);
 
-    void batchMessageTimeoutHandler(const boost::system::error_code& ec);
-
     void startSendTimeoutTimer();
 
     friend class PulsarFriend;
@@ -162,13 +164,13 @@ class ProducerImpl : public HandlerBase,
     proto::BaseCommand cmd_;
 
     std::unique_ptr<BatchMessageContainerBase> batchMessageContainer_;
-    DeadlineTimerPtr batchTimer_;
+    boost::asio::deadline_timer batchTimer_;
     PendingFailures batchMessageAndSend(const FlushCallback& flushCallback = nullptr);
 
     volatile int64_t lastSequenceIdPublished_;
     std::string schemaVersion_;
 
-    DeadlineTimerPtr sendTimer_;
+    boost::asio::deadline_timer sendTimer_;
     void handleSendTimeout(const boost::system::error_code& err);
     using DurationType = typename boost::asio::deadline_timer::duration_type;
     void asyncWaitSendTimeout(DurationType expiryTime);
@@ -182,8 +184,7 @@ class ProducerImpl : public HandlerBase,
     void failPendingMessages(Result result, bool withLock);
 
     MessageCryptoPtr msgCrypto_;
-    DeadlineTimerPtr dataKeyGenTImer_;
-    uint32_t dataKeyGenIntervalSec_;
+    PeriodicTask dataKeyRefreshTask_;
 
     MemoryLimitController& memoryLimitController_;
     const bool chunkingEnabled_;