You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/03/07 10:15:52 UTC
[GitHub] [pulsar] Brenda-Rickards-Bose opened a new issue #3779: Pulsar
Client: Ack Over Closed Connection With Broker
Brenda-Rickards-Bose opened a new issue #3779: Pulsar Client: Ack Over Closed Connection With Broker
URL: https://github.com/apache/pulsar/issues/3779
**Describe the bug**
This is an issue with the Python client. In the case that the connection to the broker node has been closed unexpectedly, when `consumer.acknowledge` is called the client (sometimes?) tries to send an ack over the closed connection.
**To Reproduce**
Steps to reproduce the behavior:
We haven't been able to reliably reproduce this behavior. We've observed it in the wild with a shared subscription.
**Expected behavior**
The client should reestablish a connection with the broker and then send the ack.
**Desktop:**
- OS: macOS Mojave Version 10.14
- Python 3.7.0
- pulsar-client 2.2.0
**Additional context: Pseudocode**
```
def setup_pulsar():
consumer_cert = PATH_TO_PULSAR_CONSUMER_CERT
consumer_key = PATH_TO_PULSAR_CONSUMER_KEY
ca_cert = PATH_TO_PULSAR_CA_CERT
auth = pulsar.AuthenticationTLS(consumer_cert, consumer_key)
topics = ["persistent://qq/qq-core/notify.one",
"persistent://qq/qq-core/notify.two",
"persistent://qq/qq-core/notify.three"]
sub_name = "msg-processor-subscription"
client = pulsar.Client(PULSAR_BROKER_URL,
tls_trust_certs_file_path = ca_cert,
authentication = auth,
use_tls = True,
tls_allow_insecure_connection = False
)
consumer = client.subscribe(topics,
subscription_name=sub_name,
receiver_queue_size=0,
consumer_type=pulsar.ConsumerType.Shared)
return client, consumer
def consume_messages():
client, consumer = setup_pulsar()
while True:
msg = consumer.receive()
MessageProcessor().process_message(msg)
consumer.acknowledge(msg)
if __name__ == '__main__':
consume_messages()
```
**Additional context: Log Snippet**
```
2019-02-20 21:48:26.580 INFO ClientConnection:1237 | [a.a.a.a:38452 -> b.b.b.b:6651] Connection closed
MSG PROCESSOR [2019-02-20 21:48:26] INFO Message Topic: notify.one; Message Environment: stage
2019-02-20 21:48:26.580 INFO HandlerBase:129 | [persistent://qq/qq-core/notify.one, msg-processor-subscription, 0] Schedule reconnection in 0.1 s
2019-02-20 21:48:26.580 INFO HandlerBase:129 | [persistent://qq/qq-core/notify.two, msg-processor-subscription, 1] Schedule reconnection in 0.1 s
2019-02-20 21:48:26.581 INFO HandlerBase:129 | [persistent://qq/qq-core/notify.three, msg-processor-subscription, 2] Schedule reconnection in 0.1 s
MSG PROCESSOR [2019-02-20 21:48:26] INFO This message is not in your environment (integration), the env is stage
2019-02-20 21:48:26.581 WARN ClientConnection:1122 | [a.a.a.a:38452 -> b.b.b.b:6651] Could not send message on connection: system:9 Bad file descriptor
2019-02-20 21:48:26.581 INFO ClientConnection:1237 | [a.a.a.a:38452 -> b.b.b.b:6651] Connection closed
2019-02-20 21:48:26.581 INFO ClientConnection:195 | [a.a.a.a:38452 -> b.b.b.b:6651] Destroyed connection
MSG PROCESSOR [2019-02-20 21:48:26] ERROR MSG_PROCESSING_ERROR: Failed to Ack Message Traceback (most recent call last):
File "/app/worker/consumer.py", line 60, in consume_messages
consumer.acknowledge(msg)
File "/usr/local/lib/python3.7/site-packages/pulsar/__init__.py", line 814, in acknowledge
self._consumer.acknowledge(message)
Exception: Pulsar error: NotConnected
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services