You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/03/07 10:15:52 UTC

[GitHub] [pulsar] Brenda-Rickards-Bose opened a new issue #3779: Pulsar Client: Ack Over Closed Connection With Broker

Brenda-Rickards-Bose opened a new issue #3779: Pulsar Client: Ack Over Closed Connection With Broker
URL: https://github.com/apache/pulsar/issues/3779
 
 
   **Describe the bug**
   This is an issue with the Python client. In the case that the connection to the broker node has been closed unexpectedly, when `consumer.acknowledge` is called the client (sometimes?) tries to send an ack over the closed connection.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   We haven't been able to reliably reproduce this behavior. We've observed it in the wild with a shared subscription.
   
   **Expected behavior**
   The client should reestablish a connection with the broker and then send the ack.
   
   **Desktop:**
    - OS: macOS Mojave Version 10.14
   - Python 3.7.0
   - pulsar-client 2.2.0
   
   **Additional context: Pseudocode**
   ```
   def setup_pulsar():
       consumer_cert = PATH_TO_PULSAR_CONSUMER_CERT
       consumer_key = PATH_TO_PULSAR_CONSUMER_KEY
       ca_cert = PATH_TO_PULSAR_CA_CERT
   
       auth = pulsar.AuthenticationTLS(consumer_cert, consumer_key)
   
       topics = ["persistent://qq/qq-core/notify.one",
                 "persistent://qq/qq-core/notify.two",
                 "persistent://qq/qq-core/notify.three"]
   
       sub_name = "msg-processor-subscription"
   
       client = pulsar.Client(PULSAR_BROKER_URL,
                               tls_trust_certs_file_path = ca_cert,
                               authentication = auth,
                               use_tls = True,
                               tls_allow_insecure_connection = False
       )
   
       consumer = client.subscribe(topics,
                                   subscription_name=sub_name,
                                   receiver_queue_size=0,
                                   consumer_type=pulsar.ConsumerType.Shared)
       return client, consumer
   
   
   def consume_messages():
       client, consumer = setup_pulsar()
       while True:
           msg = consumer.receive()
           MessageProcessor().process_message(msg)
           consumer.acknowledge(msg)
   
   
   if __name__ == '__main__':
       consume_messages()
   ```
   
   **Additional context: Log Snippet**
   
   ```
   2019-02-20 21:48:26.580 INFO  ClientConnection:1237 | [a.a.a.a:38452 -> b.b.b.b:6651] Connection closed
                   MSG PROCESSOR [2019-02-20 21:48:26] INFO     Message Topic: notify.one; Message Environment: stage
   2019-02-20 21:48:26.580 INFO  HandlerBase:129 | [persistent://qq/qq-core/notify.one, msg-processor-subscription, 0] Schedule reconnection in 0.1 s
   2019-02-20 21:48:26.580 INFO  HandlerBase:129 | [persistent://qq/qq-core/notify.two, msg-processor-subscription, 1] Schedule reconnection in 0.1 s
   2019-02-20 21:48:26.581 INFO  HandlerBase:129 | [persistent://qq/qq-core/notify.three, msg-processor-subscription, 2] Schedule reconnection in 0.1 s
                   MSG PROCESSOR [2019-02-20 21:48:26] INFO     This message is not in your environment (integration), the env is stage
   2019-02-20 21:48:26.581 WARN  ClientConnection:1122 | [a.a.a.a:38452 -> b.b.b.b:6651] Could not send message on connection: system:9 Bad file descriptor
   2019-02-20 21:48:26.581 INFO  ClientConnection:1237 | [a.a.a.a:38452 -> b.b.b.b:6651] Connection closed
   2019-02-20 21:48:26.581 INFO  ClientConnection:195 | [a.a.a.a:38452 -> b.b.b.b:6651] Destroyed connection
                   MSG PROCESSOR [2019-02-20 21:48:26] ERROR    MSG_PROCESSING_ERROR: Failed to Ack Message Traceback (most recent call last):
     File "/app/worker/consumer.py", line 60, in consume_messages
       consumer.acknowledge(msg)
     File "/usr/local/lib/python3.7/site-packages/pulsar/__init__.py", line 814, in acknowledge
       self._consumer.acknowledge(message)
   Exception: Pulsar error: NotConnected
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services