You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/24 09:58:32 UTC

[GitHub] [pulsar] ThomasPJordan opened a new issue #14448: Call to subscribe() with list of topics causes seg fault if Pulsar broker is started after

ThomasPJordan opened a new issue #14448:
URL: https://github.com/apache/pulsar/issues/14448


   **Describe the bug**
   
   - Only seen with list of topics (multi-topic subscription) being passed as the `topic` argument to `subscribe()`; not seen with single topic string as `topic` argument
   - Only seen when Pulsar service starts up after application has already attempted repeatedly to subscribe, i.e., starting the application and calling `subscribe()` works fine with topic list arg if all takes place after Pulsar service has already started up
   - Logging shows failure to connect to broker before the broker has been started, due to `ConnectError` as expected, but then when broker appears on the network, connection succeeds to broker but there is an immediate segfault
   - Using the Apache Pulsar Python client v 2.8.0
   - Repeatable, happens every time
   
   Here is the logging, note that the call to `subscribe()` is performed by the application in a loop so there are repeated attempts in the case of failure to subscribe due to `ConnectError `when the broker is absent:
   
   Actual topic and consumer names have been redacted.
   
   ```bash
   ####snip
   2022-02-23 12:33:26.620 INFO  [140494236182336] ConnectionPool:84 | Created connection for pulsar+ssl://172.17.0.3:6651/
   2022-02-23 12:33:26.621 ERROR [140494072645376] ClientConnection:422 | [<none> -> pulsar+ssl://172.17.0.3:6651/] Failed to establish connection: Connection refused
   2022-02-23 12:33:26.621 INFO  [140494072645376] ClientConnection:1446 | [<none> -> pulsar+ssl://172.17.0.3:6651/] Connection closed
   2022-02-23 12:33:26.621 ERROR [140494072645376] MultiTopicsConsumerImpl:142 | Error Checking/Getting Partition Metadata while MultiTopics Subscribing- [Muti Topics Consumer: TopicName - persistent://topic/name/redacted-TopicsConsumerFakeName-2ebN84bd9c - Subscription - consumer_name_redacted] result: ConnectError
   2022-02-23 12:33:26.621 ERROR [140494072645376] MultiTopicsConsumerImpl:91 | Failed when subscribed to topic persistent://topic/name/redacted in TopicsConsumer. Error - ConnectError
   2022-02-23 12:33:26.621 ERROR [140494072645376] MultiTopicsConsumerImpl:101 | Unable to create Consumer - [Muti Topics Consumer: TopicName - persistent://topic/name/redacted-TopicsConsumerFakeName-2ebN84bd9c - Subscription - consumer_name_redacted] Error - ConnectError
   2022-02-23 12:33:26.621 INFO  [140494072645376] ClientConnection:261 | [<none> -> pulsar+ssl://172.17.0.3:6651/] Destroyed connection
   2022-02-23 12:33:27.624 INFO  [140494236182336] ConnectionPool:74 | Deleting stale connection from pool for pulsar+ssl://172.17.0.3:6651/ use_count: -1 @ 0
   2022-02-23 12:33:27.624 INFO  [140494236182336] ConnectionPool:84 | Created connection for pulsar+ssl://172.17.0.3:6651/
   2022-02-23 12:33:27.625 ERROR [140494072645376] ClientConnection:422 | [<none> -> pulsar+ssl://172.17.0.3:6651/] Failed to establish connection: Connection refused
   2022-02-23 12:33:27.625 INFO  [140494072645376] ClientConnection:1446 | [<none> -> pulsar+ssl://172.17.0.3:6651/] Connection closed
   2022-02-23 12:33:27.625 ERROR [140494072645376] MultiTopicsConsumerImpl:142 | Error Checking/Getting Partition Metadata while MultiTopics Subscribing- [Muti Topics Consumer: TopicName - persistent://topic/name/redacted-TopicsConsumerFakeName-9e31eb5eNN - Subscription - consumer_name_redacted] result: ConnectError
   2022-02-23 12:33:27.625 ERROR [140494072645376] MultiTopicsConsumerImpl:91 | Failed when subscribed to topic persistent://topic/name/redacted in TopicsConsumer. Error - ConnectError
   2022-02-23 12:33:27.625 ERROR [140494072645376] MultiTopicsConsumerImpl:101 | Unable to create Consumer - [Muti Topics Consumer: TopicName - persistent://topic/name/redacted-TopicsConsumerFakeName-9e31eb5eNN - Subscription - consumer_name_redacted] Error - ConnectError
   2022-02-23 12:33:27.625 INFO  [140494072645376] ClientConnection:261 | [<none> -> pulsar+ssl://172.17.0.3:6651/] Destroyed connection
   2022-02-23 12:33:28.627 INFO  [140494236182336] ConnectionPool:74 | Deleting stale connection from pool for pulsar+ssl://172.17.0.3:6651/ use_count: -1 @ 0
   2022-02-23 12:33:28.628 INFO  [140494236182336] ConnectionPool:84 | Created connection for pulsar+ssl://172.17.0.3:6651/
   2022-02-23 12:33:28.628 INFO  [140494072645376] ClientConnection:372 | [172.17.0.1:52782 -> 172.17.0.3:6651] Connected to broker
   2022-02-23 12:33:30.525 INFO  [140494072645376] HandlerBase:55 | [persistent://topic/name/redacted, consumer_name_redacted, 0] Getting connection from pool
   2022-02-23 12:33:30.647 ERROR [140494072645376] ClientConnection:910 | [172.17.0.1:52782 -> 172.17.0.3:6651] Failed lookup req_id: 2 error: ServiceUnitNotReady
   2022-02-23 12:33:30.647 ERROR [140494072645376] MultiTopicsConsumerImpl:91 | Failed when subscribed to topic persistent://topic/name/redacted in TopicsConsumer. Error - ConnectError
   2022-02-23 12:33:30.647 ERROR [140494072645376] MultiTopicsConsumerImpl:101 | Unable to create Consumer - [Muti Topics Consumer: TopicName - persistent://topic/name/redacted-TopicsConsumerFakeName-e6a0ab6054 - Subscription - consumer_name_redacted] Error - ConnectError
   2022-02-23 12:33:30.647 ERROR [140494072645376] MultiTopicsConsumerImpl:412 | Closing the consumer failed for partition - persistent://topic/name/redacted with error - AlreadyClosed
   Segmentation fault (core dumped)
   ```
   
   Here is the relevant segment of application code which calls subscribe
   
   ```python
       def handle_message(self, consumer, message):
           self._logger.info(f"Received msg from {message.topic_name()}")
   
       def register_multitopic_listeners(self, topics_list, name, **kwargs):
           consumer = self._client.subscribe(
               topics_list, name,
               consumer_type=pulsar.ConsumerType.Shared,
               message_listener=self.handle_message, **kwargs)
           return consumer
   ```
   
   - The listener is a simple stub to eliminate possibility something sbusequent in the real listener application code was causing the problem.
   - `register_multitopic_listeners()` is called repeatedly in a loop containing try/except until succeeds
   
   It looks like the problem is in the underlying C++ library that the Python library serves as a wrapper for. 
   
   I ran the application in `gdb` in the same scenario and saw the following:
   
   ```bash
   ####snip
   2022-02-23 16:27:52.747 INFO  [140737190545152] ClientConnection:372 | [172.17.0.1:42206 -> 172.17.0.3:6651] Connected to broker
   [New Thread 0x7fffed3fa700 (LWP 272)]
   2022-02-23 16:27:55.034 INFO  [140737190545152] HandlerBase:55 | [persistent://topic/name/redacted, consumer_name_redacted, 0] Getting connection from pool
   2022-02-23 16:27:55.199 ERROR [140737190545152] ClientConnection:910 | [172.17.0.1:42206 -> 172.17.0.3:6651] Failed lookup req_id: 2 error: ServiceUnitNotReady
   2022-02-23 16:27:55.199 ERROR [140737190545152] MultiTopicsConsumerImpl:91 | Failed when subscribed to topic persistent://topic/name/redacted in TopicsConsumer. Error - ConnectError
   2022-02-23 16:27:55.199 ERROR [140737190545152] MultiTopicsConsumerImpl:101 | Unable to create Consumer - [Muti Topics Consumer: TopicName - persistent://topic/name/redacted-TopicsConsumerFakeName-ba9b7c85e1 - Subscription - consumer_name_redacted] Error - ConnectError
   2022-02-23 16:27:55.199 ERROR [140737190545152] MultiTopicsConsumerImpl:412 | Closing the consumer failed for partition - persistent://topic/name/redacted with error - AlreadyClosed
   
   Program received signal SIGSEGV, Segmentation fault.
   [Switching to Thread 0x7fffee3fc700 (LWP 270)]
   0x00007fffef4709ce in pulsar::MultiTopicsConsumerImpl::closeAsync(std::function<void (pulsar::Result)>) (this=0xb04948, callback=...) at /pulsar/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:389
   389	/pulsar/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc: No such file or directory.
   ```
   
   and got a backtrace from gdb:
   
   ```bash
   #0  0x00007fffef4709ce in pulsar::MultiTopicsConsumerImpl::closeAsync(std::function<void (pulsar::Result)>) (this=0xb04948, callback=...) at /pulsar/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:389
   #1  0x00007fffef472970 in pulsar::MultiTopicsConsumerImpl::handleOneTopicSubscribed (this=0xb04948, result=pulsar::ResultConnectError, consumer=..., topic=..., topicsNeedCreate=...)
       at /pulsar/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:103
   #2  0x00007fffef478699 in _M_call<std::shared_ptr<pulsar::MultiTopicsConsumerImpl>&, pulsar::Result, pulsar::Consumer const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> >&, std::shared_ptr<std::atomic<int> >&> (__ptr=..., this=<optimized out>) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:558
   #3  operator()<std::shared_ptr<pulsar::MultiTopicsConsumerImpl>&, pulsar::Result, const pulsar::Consumer&, std::basic_string<char, std::char_traits<char>, std::allocator<char> >&, std::shared_ptr<std::atomic<int> >&, void> (__object=..., this=<optimized out>) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:610
   #4  __call<void, pulsar::Result&&, const pulsar::Consumer&, 0ul, 1ul, 2ul, 3ul, 4ul> (__args=..., this=<optimized out>) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:1296
   #5  operator()<pulsar::Result, const pulsar::Consumer&, void> (this=<optimized out>) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:1355
   #6  std::_Function_handler<void (pulsar::Result, pulsar::Consumer const&), std::_Bind<std::_Mem_fn<void (pulsar::MultiTopicsConsumerImpl::*)(pulsar::Result, pulsar::Consumer, std::string const&, std::shared_ptr<std::atomic<int> >)> (std::shared_ptr<pulsar::MultiTopicsConsumerImpl>, std::_Placeholder<1>, std::_Placeholder<2>, std::string, std::shared_ptr<std::atomic<int> >)> >::_M_invoke(std::_Any_data const&, pulsar::Result, pulsar::Consumer const&) (__functor=..., __args#0=<optimized out>, __args#1=...) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:2071
   #7  0x00007fffef47a1ef in pulsar::Promise<pulsar::Result, pulsar::Consumer>::setFailed (this=<optimized out>, result=pulsar::ResultConnectError) at /pulsar/pulsar-client-cpp/lib/Future.h:130
   #8  0x00007fffef473450 in pulsar::MultiTopicsConsumerImpl::handleSingleConsumerCreated (this=0xb04948, result=pulsar::ResultConnectError, consumerImplBaseWeakPtr=..., partitionsNeedCreate=..., 
       topicSubResultPromise=...) at /pulsar/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:212
   #9  0x00007fffef479969 in _M_call<std::shared_ptr<pulsar::MultiTopicsConsumerImpl>&, pulsar::Result, std::weak_ptr<pulsar::ConsumerImplBase> const&, std::shared_ptr<std::atomic<int> >&, std::shared_ptr<pulsar::Promise<pulsar::Result, pulsar::Consumer> >&> (__ptr=..., this=<optimized out>) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:558
   #10 operator()<std::shared_ptr<pulsar::MultiTopicsConsumerImpl>&, pulsar::Result, const std::weak_ptr<pulsar::ConsumerImplBase>&, std::shared_ptr<std::atomic<int> >&, std::shared_ptr<pulsar::Promise<pulsar::Result, pulsar::Consumer> >&, void> (__object=..., this=<optimized out>) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:610
   #11 __call<void, pulsar::Result&&, const std::weak_ptr<pulsar::ConsumerImplBase>&, 0ul, 1ul, 2ul, 3ul, 4ul> (__args=..., this=<optimized out>) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:1296
   #12 operator()<pulsar::Result, const std::weak_ptr<pulsar::ConsumerImplBase>&, void> (this=<optimized out>) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:1355
   #13 std::_Function_handler<void (pulsar::Result, std::weak_ptr<pulsar::ConsumerImplBase> const&), std::_Bind<std::_Mem_fn<void (pulsar::MultiTopicsConsumerImpl::*)(pulsar::Result, std::weak_ptr<pulsar::ConsumerImplBase>, std::shared_ptr<std::atomic<int> >, std::shared_ptr<pulsar::Promise<pulsar::Result, pulsar::Consumer> >)> (std::shared_ptr<pulsar::MultiTopicsConsumerImpl>, std::_Placeholder<1>, std::_Placeholder<2>, std::shared_ptr<std::atomic<int> >, std::shared_ptr<pulsar::Promise<pulsar::Result, pulsar::Consumer> >)> >::_M_invoke(std::_Any_data const&, pulsar::Result, std::weak_ptr<pulsar::ConsumerImplBase> const&) (
       __functor=..., __args#0=<optimized out>, __args#1=...) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:2071
   #14 0x00007fffef44d56f in pulsar::Promise<pulsar::Result, std::weak_ptr<pulsar::ConsumerImplBase> >::setFailed (this=this@entry=0x7fffe00028b8, result=pulsar::ResultConnectError)
       at /pulsar/pulsar-client-cpp/lib/Future.h:130
   #15 0x00007fffef445c36 in pulsar::ConsumerImpl::connectionFailed (this=0x7fffe0001c18, result=<optimized out>) at /pulsar/pulsar-client-cpp/lib/ConsumerImpl.cc:200
   #16 0x00007fffef4639d5 in pulsar::HandlerBase::handleNewConnection (result=pulsar::ResultConnectError, connection=..., weakHandler=...) at /pulsar/pulsar-client-cpp/lib/HandlerBase.cc:79
   #17 0x00007fffef464464 in __call<void, pulsar::Result&&, const std::weak_ptr<pulsar::ClientConnection>&, 0ul, 1ul, 2ul> (__args=..., this=<optimized out>)
       at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:1296
   #18 operator()<pulsar::Result, const std::weak_ptr<pulsar::ClientConnection>&, void> (this=<optimized out>) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:1355
   #19 std::_Function_handler<void (pulsar::Result, std::weak_ptr<pulsar::ClientConnection> const&), std::_Bind<void (*(std::_Placeholder<1>, std::_Placeholder<2>, std::weak_ptr<pulsar::HandlerBase>))(pulsar::Result, std::weak_ptr<pulsar::ClientConnection>, std::weak_ptr<pulsar::HandlerBase>)> >::_M_invoke(std::_Any_data const&, pulsar::Result, std::weak_ptr<pulsar::ClientConnection> const&) (__functor=..., 
       __args#0=<optimized out>, __args#1=...) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:2071
   #20 0x00007fffef42bf4f in pulsar::Promise<pulsar::Result, std::weak_ptr<pulsar::ClientConnection> >::setFailed (this=this@entry=0x7fffee3fad90, result=pulsar::ResultConnectError)
       at /pulsar/pulsar-client-cpp/lib/Future.h:130
   #21 0x00007fffef4232b8 in pulsar::ClientImpl::handleLookup (this=<optimized out>, result=<optimized out>, data=..., promise=...) at /pulsar/pulsar-client-cpp/lib/ClientImpl.cc:416
   #22 0x00007fffef4298dc in _M_call<std::shared_ptr<pulsar::ClientImpl>&, pulsar::Result, std::shared_ptr<pulsar::LookupDataResult> const&, pulsar::Promise<pulsar::Result, std::weak_ptr<pulsar::ClientConnection> >&> (__ptr=..., this=0x7fffe0006e00) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:558
   #23 operator()<std::shared_ptr<pulsar::ClientImpl>&, pulsar::Result, const std::shared_ptr<pulsar::LookupDataResult>&, pulsar::Promise<pulsar::Result, std::weak_ptr<pulsar::ClientConnection> >&, void> (
       __object=..., this=0x7fffe0006e00) at /opt/rh/devtoolset-2/root/usr/include/c++/4.8.2/functional:610
   
   ####snip
   ```
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Start up Python application that calls subscribe() with list of topics
   2. Observe that it continually tries and fails to connect to Pulsar broker as per logging above
   3. Start up Pulsar service
   4. After a short while, get segfault in the Pulsar library
   
   **Expected behavior**
   I'd expect to be able to call subscribe() repeatedly and if the Pulsar broker is not up get a ConnectError each time, then once the Pulsar broker is up, I'd expect subscribe to connect successfully after a few attempts start receiving data sent to the subscribed topics.
   
   **Screenshots**
   If applicable, add screenshots to help explain your problem.
   
   **Desktop (please complete the following information):**
    - OS: [e.g. iOS]
   
   **Additional context**
   Add any other context about the problem here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org