You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/19 12:26:39 UTC

[GitHub] [pulsar] klwilson227 opened a new issue #7851: pulsar-client-cpp crashes consumer due to thread safty

klwilson227 opened a new issue #7851:
URL: https://github.com/apache/pulsar/issues/7851


   **Describe the bug**
   When running the pulsar-client-cpp on 2.5.2 we are experiencing the following stack trace. Our code instanciates and holds open a client then time slices the consumers on a number of threads. Each thread subscribe, consumes message followed by immediate acknowledge, then closes the consumer. The stack trace has been seen between a few hours and a month apart. We finally received a core file for the crash shown below:
   
   #0  0x00007f03272b02c7 in raise () from /lib64/libc.so.6
   #1  0x00007f03272b19b8 in abort () from /lib64/libc.so.6
   #2  0x0000000004075565 in Basics::Backtrace::DoCoreDump(char const*, bool) ()
   #3  0x00000000040c95c5 in Basics::GlobalSignalHandlers::logCoreDump() ()
   #4  0x00000000040ca6a5 in Basics::sigHandler_withinATryCatch(int, siginfo*, void*) ()
   #5  0x00000000040ca77e in Basics::sigHandler(int, siginfo*, void*) ()
   #6  <signal handler called>
   #7  0x00007f018313414e in pulsar::MessageId::operator< (this=0x7f030ea6c9e8, other=
         @0x7f00ebffaf00: {impl_ = std::shared_ptr (count 3, weak 0) 0x7f030f68b538})
       at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/MessageId.cc:100
   #8  0x00007f01831c63f0 in operator() (this=0x7f030ea6c9c0, __y=@0x7f00ebffaf00: {impl_ = std::shared_ptr (count 3, weak 0) 0x7f030f68b538},
       __x=<optimized out>) at /usr/include/c++/4.8.2/bits/stl_function.h:235
   #9  std::_Rb_tree<pulsar::MessageId, std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> > >, std::_Select1st<std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> > > >, std::less<pulsar::MessageId>, std::allocator<std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> > > > >::_M_get_insert_hint_unique_pos (
       this=this@entry=0x7f030ea6c9c0, __position=<optimized out>, __k=@0x7f00ebffaf00: {impl_ = std::shared_ptr (count 3, weak 0) 0x7f030f68b538})
       at /usr/include/c++/4.8.2/bits/stl_tree.h:1422
   #10 0x00007f01831c6419 in std::_Rb_tree<pulsar::MessageId, std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> > >, std::_Select1st<std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> > > >, std::less<pulsar::MessageId>, std::allocator<std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> > > > >::_M_insert_unique_<std::pair<pulsar::MessageId, boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> > > >(std::_Rb_tree_const_iterator<std::pair<pulsar::MessageId const, boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> > > >, std::pair<pulsar::MessageId, boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> > >&&) (this=this@entry=0x7f030ea6c9c0, __position=<optimized out>, __position@entry=
           {first = {impl_ = std::shared_ptr (count -660842048, weak 32512) 0xffffffffffffffff}, second = {static bits_per_block = <error reading variable: No global symbol "boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> >::bits_per_block".>, static npos = <optimized out>, static ulong_width = <error reading variable: No global symbol "boost::dynamic_bitset<unsigned long, std::allocator<unsigned long> >::ulong_width".>, m_bits = std::vector of length 128, capacity -25369645 = {139645905820120, 139645905820096, 139645903947864, 139645903947840, 139645895612888, 139645895612864, 139645903478456, 139645903478432, 139645905415224, 139645905415200, 139645907124328, 139645907124304, 139645897524632, 139645897524608, 139645896941944, 139645896941920, 139645903479768, 139645903479744, 139645897213704, 139645897213680, 139645897213832, 139645897213808, 139645902580088, 139645902580064, 139645902579928, 139645902579904, 139645896941448, 139645896941424, 139645897213320, 139
 645897213296, 139645901676504, 139645901676480, 139645899269576, 139645899269552, 139645905827016, 139645905826992, 139645898799784, 139645898799760, 139645901615560, 139645901615536, 139645903352184, 139645903352160, 139645905772872, 139645905772848, 139645899479656, 139645899479632, 139645905308152, 139645905308128, 139645897513528, 139645897513504, 139645896668632, 139645896668608, 139645898799128, 139645898799104, 139645905331112, 139645905331088, 139645905331304, 139645905331280, 139645898799256, 139645898799232, 139645899213224, 139645899213200, 139645905418312, 139645905418288, 139645903262888, 139645903262864, 139645896344696, 139645896344672, 139645899177384, 139645899177360, 139645905401368, 139645905401344, 139645907120840, 139645907120816, 139645896458248, 139645896458224, 139645898790536, 139645898790512, 139645897192552, 139645897192528, 139645899378632, 139645899378608, 112513136, 216, 112528512, 433, 139645899296624, 139645900272320, 139645898787520, 139645898012976,
  112486928, 61818273202178, 112528416, 273, 139645895573880, 139645895573880, 0, 0, 45035996273704978, 45035996273704978, 1, 114046456, 0, 0, 0, 139645898784768, 114046456, 139645898012928, 0, 0, 139645897425536, 139642271694848, 0, 139645897425560, 139645897425560, 0, 114046456, 139645897424640, 0, 0, 139645897424848, 139642271694848, 0, 139645897425640, 139645897425640, 0, 336, 68}, m_num_bits = 139645694467488}},
       __v=__v@entry=<unknown type in /usr/lib/debug/usr/local/itom-di-pulsarudx/lib/libitom-di-pulsarudx-9.2.1.so.debug, CU 0x59efd9, DIE 0x5aa2ab>)
       at /usr/include/c++/4.8.2/bits/stl_tree.h:1478
   #11 0x00007f01831c4d19 in insert<std::pair<pulsar::MessageId, boost::dynamic_bitset<> >, void> (
       __x=<unknown type in /usr/lib/debug/usr/local/itom-di-pulsarudx/lib/libitom-di-pulsarudx-9.2.1.so.debug, CU 0x59efd9, DIE 0x5aa2ab>,
       __position=<optimized out>, this=0x7f030ea6c9c0) at /usr/include/c++/4.8.2/bits/stl_map.h:657
   **#12 pulsar::BatchAcknowledgementTracker::receivedMessage (this=this@entry=0x7f030ea6c998, message=warning: RTTI symbol not found for class 'std::_Sp_counted_ptr_inplace<pulsar::MessageImpl, std::allocator<pulsar::MessageImpl>, (__gnu_cxx::_Lock_policy)2>'
   warning: RTTI symbol not found for class 'std::_Sp_counted_ptr_inplace<pulsar::MessageImpl, std::allocator<pulsar::MessageImpl>, (__gnu_cxx::_Lock_policy)2>'**
   ---Type <return> to continue, or q <return> to quit---
   @0x7f00ebffb350: {impl_ = std::shared_ptr (count 1, weak 0) 0x7f030f6a7148})
       at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc:61
   #13 0x00007f0183113a2e in pulsar::ConsumerImpl::receiveIndividualMessagesFromBatch (this=this@entry=0x7f030ea6bcc8, cnx=warning: RTTI symbol not found for class 'std::_Sp_counted_ptr<pulsar::ClientConnection*, (__gnu_cxx::_Lock_policy)2>'
   warning: RTTI symbol not found for class 'std::_Sp_counted_ptr<pulsar::ClientConnection*, (__gnu_cxx::_Lock_policy)2>'
   std::shared_ptr (count 7, weak 4) 0x7f030c018fd0, batchedMessage=warning: RTTI symbol not found for class 'std::_Sp_counted_ptr_inplace<pulsar::MessageImpl, std::allocator<pulsar::MessageImpl>, (__gnu_cxx::_Lock_policy)2>'
   warning: RTTI symbol not found for class 'std::_Sp_counted_ptr_inplace<pulsar::MessageImpl, std::allocator<pulsar::MessageImpl>, (__gnu_cxx::_Lock_policy)2>'
   @0x7f00ebffb350: {impl_ = std::shared_ptr (count 1, weak 0) 0x7f030f6a7148},
       redeliveryCount=0) at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/ConsumerImpl.cc:369
   
   Now understanding that we are doing a close, focus in on Line #12. This shows that we are manipulating a RB tree which is not inherently thread safe.  And we have two possible competing threads:
   
   1) The receiving message thread which is receiving and processing messages with handleRead.. which is many traces further down in the stack. The above is the stack for handleRead Thread. 
   2) The second thread is triggered by a Consumer::close(). This thread is allowed to interfere with the BatchAcknowledgementTracker, in that it eventually calls the clear() on the method. The BatchAcknowledgementTracker::clear() methods is not guarded by the mutex within the class so the clear() is able to proceed at the same time the insert of a new record is happening. The objects in the tree are then removed, while the iteration is happening. Resulting in the core dump above. 
   
   **To Reproduce**
   Steps to reproduce the behavior:
   Since this is 1/1 Billion to 1/1Trillion failure rate per message. This is not advisable to attempt to reproduce and simply wait for the failure to occur while the software grinds on processing messages and closing the consumers()... 
   
   Create a Unit test that executes 2 threads. 
   1) inserts messages into the BatchAcknowledgementTracker simulating reads happening. 
   2) The other calling the clear() method on the batch acknowledgment tracker. 
   
   Core dump should occur fairly quickly. 
   
   **Expected behavior**
   Software runs without crashing.
   
   
   **Desktop (please complete the following information):**
    - OS: Centos
   
   **Additional context**
   Code inspection of BatchAcknowledgmentTracker and UnAcknowledgeMessageTrackerEnabled classes both show similar problems with the clear() not using a mutex and thus not thread safe. 
   '
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #7851: pulsar-client-cpp crashes consumer due to thread safty

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #7851:
URL: https://github.com/apache/pulsar/issues/7851#issuecomment-677716498


   Though make `clear()` thread-safe may be better, **I'm not sure if the crash is caused by this**.
   
   Because `clear()` is called only when the connection is established:
   
   ```c++
   void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
       /* ... */
   
       unAckedMessageTrackerPtr_->clear();
       batchAcknowledgementTracker_.clear();
   
       /* ... */
       uint64_t requestId = client->newRequestId();
       SharedBuffer cmd = Commands::newSubscribe(
           topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
           startMessageId_, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition());
       // trigger the `handleCreateConsumer()` method after receiving response from broker
       cnx->sendRequestWithId(cmd, requestId)
           .addListener(
               std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));
   }
   ```
   
   The future is completed in `handleCreateConsumer`, which is after `connectionOpened`:
   
   ```c++
   void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
       static bool firstTime = true;
       if (result == ResultOk) {
           /* ... */
           consumerCreatedPromise_.setValue(shared_from_this());  // getConsumerCreatedFuture() is completed
       } else {
   ```
   
   Then `ClientImpl::handleConsumerCreated` will be called, see:
   
   ```c++
   void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr partitionMetadata,
                                    TopicNamePtr topicName, const std::string& consumerName,
                                    ConsumerConfiguration conf, SubscribeCallback callback) {
       if (result == ResultOk) {
           /* ... */
           // `handleConsumerCreated` is called when getConsumerCreatedFuture() is completed
           consumer->getConsumerCreatedFuture().addListener(
               std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
                         std::placeholders::_2, callback, consumer));
           Lock lock(mutex_);
           consumers_.push_back(consumer);
           lock.unlock();
           consumer->start();
       } else {
   ```
   
   ```c++
   void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                          SubscribeCallback callback, ConsumerImplBasePtr consumer) {
       callback(result, Consumer(consumer));  // Finally, the consumer argument of `Client::subscribe` is set to this consumer
   }
   ```
   
   Therefore, the consumer could only call `acknowledge` or `receive` after `subscribe` returned, the race condition couldn't happen even if `clear()` of trackers is not thread-safe.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai closed issue #7851: pulsar-client-cpp crashes consumer due to thread safty

Posted by GitBox <gi...@apache.org>.
jiazhai closed issue #7851:
URL: https://github.com/apache/pulsar/issues/7851


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on issue #7851: pulsar-client-cpp crashes consumer due to thread safty

Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #7851:
URL: https://github.com/apache/pulsar/issues/7851#issuecomment-677738722


   ConsumerImpl::connectionOpened may also comes from a re-connection after connection breaked?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #7851: pulsar-client-cpp crashes consumer due to thread safty

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #7851:
URL: https://github.com/apache/pulsar/issues/7851#issuecomment-677716498


   Though make `clear()` thread-safe may be better, **I'm not sure if the crash is caused by this**.
   
   First, make a correction that `clear()` is called only when the connection is established:
   
   ```c++
   void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
       /* ... */
   
       unAckedMessageTrackerPtr_->clear();
       batchAcknowledgementTracker_.clear();
   
       /* ... */
       uint64_t requestId = client->newRequestId();
       SharedBuffer cmd = Commands::newSubscribe(
           topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
           startMessageId_, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition());
       // trigger the `handleCreateConsumer()` method after receiving response from broker
       cnx->sendRequestWithId(cmd, requestId)
           .addListener(
               std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));
   }
   ```
   
   The future is completed in `handleCreateConsumer`, which is after `connectionOpened`:
   
   ```c++
   void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
       static bool firstTime = true;
       if (result == ResultOk) {
           /* ... */
           consumerCreatedPromise_.setValue(shared_from_this());  // getConsumerCreatedFuture() is completed
       } else {
   ```
   
   Then `ClientImpl::handleConsumerCreated` will be called, see:
   
   ```c++
   void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr partitionMetadata,
                                    TopicNamePtr topicName, const std::string& consumerName,
                                    ConsumerConfiguration conf, SubscribeCallback callback) {
       if (result == ResultOk) {
           /* ... */
           // `handleConsumerCreated` is called when getConsumerCreatedFuture() is completed
           consumer->getConsumerCreatedFuture().addListener(
               std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
                         std::placeholders::_2, callback, consumer));
           Lock lock(mutex_);
           consumers_.push_back(consumer);
           lock.unlock();
           consumer->start();
       } else {
   ```
   
   ```c++
   void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                          SubscribeCallback callback, ConsumerImplBasePtr consumer) {
       callback(result, Consumer(consumer));  // Finally, the consumer argument of `Client::subscribe` is set to this consumer
   }
   ```
   
   Therefore, the consumer could only call `acknowledge` or `receive` after `subscribe` returned, the race condition couldn't happen even if `clear()` of trackers is not thread-safe.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org