You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Francesco Raviglione <fr...@gmail.com> on 2020/06/19 11:08:24 UTC

Qpid Proton Python: sending data to multiple topics only when available from external data source

Dear all,
I'm trying use the Python version of Qpid Proton to perform the following
task, together with a Tornado HTTP server:
1) The Tornado web application should listen to a certain port and wait for
POST requests
2) When a POST request occurs, it should get the received data (which is
divided into two parts: <topic name>-<actual content>), and use it to:
2a) Create a new topic on an ActiveMQ broker (I also think that a new
sender with the topic name as address should be created), with Qpid Proton,
using the first part of the data, if this topic does not exist yet; if it
already exists (i.e. the first part of the data has already been received
in previous POST requests), go to point (2b)
2b) Send the second part of the data to that specific topic.

I'm basically trying to send data to multiple topics, depending on the data
content, sending the messages only when the data becomes available from
another thread/application (in this case, the Tornado web application).

I have tried creating two threads, like in this code snippet:

class AMQPHandler(MessagingHandler):
    def __init__(self, url, thread_event):
        super(AMQPHandler, self).__init__()
        # Save the passed parameters...

    def on_start(self, event):
        # ...

    def send_message(self,sendert):
        # ...

    def on_sendable(self,event):
        self.thread_event.wait()
# Waiting to be unblocked by a POST request? <- if we do this, the
subsequent send() call will not send the message!
        self.thread_event.clear()
# ...

    def on_accepted(self,event):
# ...


class HttpRequestHandler(tornado.web.RequestHandler):
    def initialize(self,thread_event,):
        self.thread_event = thread_event
        # ...

    def get(self):
        # ...

    def post(self):
        data=self.request.body # Get the received data
        # ...
        self.thread_event.set() # Unblock Qpid Proton?

def http_server_loop(thread_event):
    asyncio.set_event_loop(asyncio.new_event_loop())
    application = tornado.web.Application([
        (r"/", HttpRequestHandler, dict(thread_event=thread_event)),
    ])
    application.listen(58888)
    tornado.ioloop.IOLoop.current().start()

def amqp_client_loop(thread_event):
    asyncio.set_event_loop(asyncio.new_event_loop())
    Container(AMQPHandler(url,thread_event)).run()

def main (argv):
# ...
    thread_event = threading.Event()
    t=Thread(target=http_server_loop,args=(thread_event,))
t.start()
    t=Thread(target=amqp_client_loop,args=(thread_event,))
t.start()
# ...

However, I was never able, by writing the code in each callback, to reach
the desired result.
In particular, I'm having issues in trying to make Qpid Proton send a new
message to the broker only when data is available from another application
and I don't know if there is an efficient way to send messages to more than
one topic (involving more than one sender) from within the same
MessagingHandler, creating new topics only when it is needed.

Is there a way to reach the desired result by using the same
MessagingHandler? Which is the best way to "synchronize" the transmission
of AMQP 1.0 messages to a data source which is external to Qpid Proton?

Thank you very much in advance.