You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Martin Husbyn (Jira)" <ji...@apache.org> on 2020/11/30 18:26:00 UTC

[jira] [Created] (PROTON-2306) Getting connection inactive error when consuming messages from Azure Service Bus

Martin Husbyn created PROTON-2306:
-------------------------------------

             Summary: Getting connection inactive error when consuming messages from Azure Service Bus
                 Key: PROTON-2306
                 URL: https://issues.apache.org/jira/browse/PROTON-2306
             Project: Qpid Proton
          Issue Type: Bug
          Components: python-binding
    Affects Versions: proton-c-0.31.0
         Environment: The service runs as 2 instances (pods) in a Kubernetes cluster hosted on Azure Kubernetes Service.
            Reporter: Martin Husbyn


Hi,

We currently have a simple service that consumes messages from Azure Service Bus (indefinitely), and based on the contents of those messages, either ignores it or does a POST request to another internal service endpoint for our app. 

However, occasionally we get the following exception:

> The connection was inactive for more than the allowed 240000 milliseconds and is closed by container '5942ad2288f8495aaea81fe91c935445_G10'.

This is then shortly followed by:

> amqp:connection:framing-error: SSL Failure: Unknown error

I've observed that this error happens pretty much exactly 4 minutes (i.e. 240000 ms) after the last time the service processed an incoming message. This suggests to me that it somehow after processing a message, suddenly stops sending heartbeats to the Azure Service Bus to keep the connection open.

 

Any help or pointers in the right direction here would be much appreciated. I've been looking into this for several days without really getting anywhere.

 

Below is what I believe to be the relevant code:

```
class TriggerConsumer(MessagingHandler):
 def __init__(
 self,
 server: str,
 address: str,
 event_filter,
 ):
 super().__init__()
 self._server = server
 self._address = address
 self._event_filter = event_filter

 def on_start(self, event: Event):
 """Called when the event loop starts. See superclass docs for a (tiny) bit more info.

 We use it here to 'set up' our event receiver from the queue.
 """
 logging.info('Starting consumer...')
 # Source: https://www.johndehavilland.com/blog/2017/07/05/Python-ServiceBus-AMQP.html
 conn = event.container.connect(self._server, allowed_mechs='PLAIN')
 event.container.create_receiver(conn, self._address)
 logging.info('Started consumer')

 health_check.set_ready()

 def on_message(self, event: Event):
 logging.info('Receiving message')
 message_id = None
 try:
 message_id = event.message.id
 message_content = json.loads(event.message.body)

 try:
 self._event_filter.check_event(
 event_body=message_content, last_processed_event_time=self._last_processing_time
 )
 except FilterError as e:
 logging.info(f"Event with id '\{message_id}' rejected by event filter: \{e}")
 return

 # Do POST call to separate service in the cluster here
 do_post_call()

 logging.info(f'Successfully processed event with id \{message_id}')

 # Message is not acked if we raise an exception, so to avoid a vicious loop of crashing on the same
 # message we catch all exceptions here.
 except Exception as e:
 logging.error(f'Failed to process event with id \{message_id}: \{e}')
 logging.exception(e)

 return


if __name__ == '__main__':
 logging.info('Starting service')

 # Spawns a separate thread with a simple HTTP server exposing health monitoring endpoints to Docker
 start_http_health_check_endpoint()

 consumer = TriggerConsumer(
 # AMQP connection string, usually in the form of 'amqps://\{keyname}:\{key}@\{queue server name}.servicebus.windows.net'.
 server=Config.AMQP_SERVER_CONN,
 address=Config.QUEUE_NAME,
 event_filter=EventFilter()
 )
 Container(consumer).run()

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org