You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by areddy <ar...@better.com> on 2018/09/21 15:15:57 UTC

Race condition in MessagingHandler

Hello, i'm using MessagingHandler to consume messages off of activemq. This
is my configuration

1. prefetch = 0 and I only process 1 message at a time.
2. auto_accept=False.
3. on_message creates a new thread on every message to "consume" the message

I've noticed that after a random number of messages, 2 disposition frames
are sent out. One without the "settled" flag and one with. This causes the
broker to send it's own disposition frame back, after which the receiver
stops receiving messages.

<code>
class TestHandler(MessagingHandler):

    def on_message(self, event):

        def invoke_callback(callback, message):
            sleep(0.1) # <----- Resolves the race condition
            callback(message) # <---- Calls delivery.accept or
delivery.reject

        # Kick off a thread to process the message,
        # so that the event thread can continue on
        Thread(
            target=invoke_callback,
            daemon=True,
            args=(
                self._receiver_callback,
                event.message.body)).start()
</code>

*Output*

[0x7fe9cd728950]:0 -> @disposition(21) [role=true, first=4, settled=true,
state=@accepted(36) []]
[0x7fe9cd728950]:0 <- @transfer(20) [handle=0, delivery-id=5,
delivery-tag=b"\x01", message-format=0] (169)
"\x00SpE\x00SsE\x00Sw�\x9c{"payload": {"id": 3767}, "createdAt":
"2018-09-21T13:50:37.135720", "name": "testJob", "id":
"3d4ef5d7-c006-420e-b786-c79584e97899", "correlationId": null}"
2018-09-21 09:56.06 Queue.on_receive              
job=3d4ef5d7-c006-420e-b786-c79584e97899
<better_queue.job_envelope.JobEnvelope object at 0x10bb6c6a0>
*[0x7fe9cd728950]:0 -> @disposition(21) [role=true, first=5,
state=@accepted(36) []]
[0x7fe9cd728950]:0 -> @disposition(21) [role=true, first=5, settled=true,
state=@accepted(36) []]*
2018-09-21 09:56.06 better_queue.processed        
job_id=3d4ef5d7-c006-420e-b786-c79584e97899
[0x7fe9cd728950]:0 <- @disposition(21) [role=false, first=5, last=5,
settled=true, state=@accepted(36) []]


Is this a bug ? Or am I using the API incorrectly ?




--
Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: Race condition in MessagingHandler

Posted by Gordon Sim <gs...@redhat.com>.
On 21/09/18 16:38, areddy wrote:
> Thank you for confirming that and providing an alternative. It looks like the
> EventInjector is the correct way to handle this situation, is that correct ?

Yes, that is what it is intended for to allow actions from other threads 
to be injected as events into the event lopp thread where they can 
safely operate on any of the connections managed by that loop.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: Race condition in MessagingHandler

Posted by areddy <ar...@better.com>.
Thank you for confirming that and providing an alternative. It looks like the
EventInjector is the correct way to handle this situation, is that correct ?



--
Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: Race condition in MessagingHandler

Posted by Gordon Sim <gs...@redhat.com>.
On 21/09/18 16:15, areddy wrote:
> Hello, i'm using MessagingHandler to consume messages off of activemq. This
> is my configuration
> 
> 1. prefetch = 0 and I only process 1 message at a time.
> 2. auto_accept=False.
> 3. on_message creates a new thread on every message to "consume" the message
> 
> I've noticed that after a random number of messages, 2 disposition frames
> are sent out. One without the "settled" flag and one with. This causes the
> broker to send it's own disposition frame back, after which the receiver
> stops receiving messages.

[...]

> Is this a bug ? Or am I using the API incorrectly ?

You are using the API incorrectly. Connections are not threadsafe so it 
is not safe to invoke on them from one thread while the event thread is 
processing them.

One approach is that used by the db_send/db_recv examples: 
https://git1-us-west.apache.org/repos/asf/qpid-proton/repo?p=qpid-proton.git;a=blob;f=python/examples/db_recv.py;h=8c7904980914417c7cfb4fa72ce75607da99631a;hb=HEAD

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org