You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/20 14:59:16 UTC

[GitHub] [pulsar] BewareMyPower edited a comment on issue #7851: pulsar-client-cpp crashes consumer due to thread safty

BewareMyPower edited a comment on issue #7851:
URL: https://github.com/apache/pulsar/issues/7851#issuecomment-677716498


   Though make `clear()` thread-safe may be better, **I'm not sure if the crash is caused by this**.
   
   First, make a correction that `clear()` is called only when the connection is established:
   
   ```c++
   void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
       /* ... */
   
       unAckedMessageTrackerPtr_->clear();
       batchAcknowledgementTracker_.clear();
   
       /* ... */
       uint64_t requestId = client->newRequestId();
       SharedBuffer cmd = Commands::newSubscribe(
           topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
           startMessageId_, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition());
       // trigger the `handleCreateConsumer()` method after receiving response from broker
       cnx->sendRequestWithId(cmd, requestId)
           .addListener(
               std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));
   }
   ```
   
   The future is completed in `handleCreateConsumer`, which is after `connectionOpened`:
   
   ```c++
   void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
       static bool firstTime = true;
       if (result == ResultOk) {
           /* ... */
           consumerCreatedPromise_.setValue(shared_from_this());  // getConsumerCreatedFuture() is completed
       } else {
   ```
   
   Then `ClientImpl::handleConsumerCreated` will be called, see:
   
   ```c++
   void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr partitionMetadata,
                                    TopicNamePtr topicName, const std::string& consumerName,
                                    ConsumerConfiguration conf, SubscribeCallback callback) {
       if (result == ResultOk) {
           /* ... */
           // `handleConsumerCreated` is called when getConsumerCreatedFuture() is completed
           consumer->getConsumerCreatedFuture().addListener(
               std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
                         std::placeholders::_2, callback, consumer));
           Lock lock(mutex_);
           consumers_.push_back(consumer);
           lock.unlock();
           consumer->start();
       } else {
   ```
   
   ```c++
   void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                          SubscribeCallback callback, ConsumerImplBasePtr consumer) {
       callback(result, Consumer(consumer));  // Finally, the consumer argument of `Client::subscribe` is set to this consumer
   }
   ```
   
   Therefore, the consumer could only call `acknowledge` or `receive` after `subscribe` returned, the race condition couldn't happen even if `clear()` of trackers is not thread-safe.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org