You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Steve Rice <st...@teamquest.com> on 2015/01/15 02:01:03 UTC

how to safely delete a cms::MessageListener object

Since the onMessage() function runs in a separate thread, how do I safely
delete a cms::MessageListener object in the main thread using C++11?  If the
reply I am looking for never comes, I want to give up on listening for it
and throw away this object.  But what if in the process of deleting this
object in the main thread, the onMessage() function gets called in the
separate thread?  In other words, how do I stop listening for messages and
clean up safely from the main thread?

Steve Rice




--
View this message in context: http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: how to safely delete a cms::MessageListener object

Posted by Timothy Bish <ta...@gmail.com>.
On 01/14/2015 08:01 PM, Steve Rice wrote:
> Since the onMessage() function runs in a separate thread, how do I safely
> delete a cms::MessageListener object in the main thread using C++11?  If the
> reply I am looking for never comes, I want to give up on listening for it
> and throw away this object.  But what if in the process of deleting this
> object in the main thread, the onMessage() function gets called in the
> separate thread?  In other words, how do I stop listening for messages and
> clean up safely from the main thread?
>
> Steve Rice
>
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
The safest way is to close the consumer, then you know it won't dispatch 
any more message.  If you want to continue to use that consumer then you 
need to stop the session in order to ensure that no dispatches will 
occur at which point you can set the message listener to null on the 
consumer and then destroy your MessageListener.

-- 
Tim Bish
Sr Software Engineer | RedHat Inc.
tim.bish@redhat.com | www.redhat.com
skype: tabish121 | twitter: @tabish121
blog: http://timbish.blogspot.com/


Re: how to safely delete a cms::MessageListener object

Posted by Tim Bain <tb...@alumni.duke.edu>.
That sounds way better than what I'd proposed.

On Fri, Jan 16, 2015 at 9:18 AM, Timothy Bish <ta...@gmail.com> wrote:

> On 01/15/2015 09:16 PM, artnaseef wrote:
>
>> How about the following instead?  As Tim pointed out, the consumer close
>> won't return until no more calls to onMessage are guaranteed.  I believe
>> that also means it won't return while onMessage() is actively being
>> called.
>>
>> @Tim - can you confirm?
>>
>
> It should be the case that onMessage must always complete prior to close()
> returning, unless you do something silly like call close from onMessage.
> There are two locks at play here that prevent things from going wrong, one
> on the listener and one on the dispatch queue.  This is why it is also safe
> to just call setMessageListener(null) on the consumer as it will also not
> return until no onMessage calls are in progress.  During the time there is
> no set message listener the messages will pile up in the prefetch queue and
> dispatch would start again once a new listener is added.
>
>
>
>> If that's correct, the following should do the job effectively:
>>
>> ON MESSAGE
>>      LOCK semaphore
>>      IF shutdown started
>>      THEN
>>         ABORT
>>      END IF
>>
>>      SET processing active
>>      UNLOCK semaphore
>>
>>      DO process message
>>
>>      LOCK semaphore
>>        SET processing NOT active
>>      UNLOCK semaphore
>>
>>
>> ON LISTENER SHUTDOWN
>>      LOCK semaphore
>>        SET shutdown started
>>      UNLOCK semaphore
>>
>>      DO close consumer
>>
>>      SET not done
>>      WHILE not done
>>        LOCK semaphore
>>          IF processing NOT active
>>          THEN
>>            SET done
>>          END IF
>>        UNLOCK semaphore
>>
>>        IF NOT done
>>        THEN
>>          DELAY // short sleep; note there are methods to eliminate the
>> sleep
>> and use a notification; I don't recall them for C++ right now
>>        END IF
>>      END WHILE
>>
>>      DO delete message listener
>>
>>
>> That last part about the delay can be solved with another semaphore as
>> well.
>> Also, if the part up to "ABORT" in the listener can be done safely even
>> after the listener is deleted, then this works even if the consumer close
>> can return while the call to onMessage() is still active.
>>
>>
>>
>> --
>> View this message in context: http://activemq.2283324.n4.
>> nabble.com/how-to-safely-delete-a-cms-MessageListener-
>> object-tp4689939p4690003.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
>
> --
> Tim Bish
> Sr Software Engineer | RedHat Inc.
> tim.bish@redhat.com | www.redhat.com
> skype: tabish121 | twitter: @tabish121
> blog: http://timbish.blogspot.com/
>
>

Re: how to safely delete a cms::MessageListener object

Posted by Timothy Bish <ta...@gmail.com>.
On 01/16/2015 12:09 PM, Steve Rice wrote:
> So if the thread wanting to shut down the Message Listening sets the message
> listener to NULL, the setMessageListener call will not return until there is
> no onMessage function in progress?
>
> And so, after the setMessageListener(NULL) call returns, it is safe to
> delete the MessageListener object?
>
> Steve Rice
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939p4690057.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> .
>
That is correct, you can see this in the source of setMessageListener 
which locks on the listenerMutex which is held during onMessage calls to 
prevent the problem you are trying to avoid.

-- 
Tim Bish
Sr Software Engineer | RedHat Inc.
tim.bish@redhat.com | www.redhat.com
skype: tabish121 | twitter: @tabish121
blog: http://timbish.blogspot.com/


Re: how to safely delete a cms::MessageListener object

Posted by Steve Rice <st...@teamquest.com>.
So if the thread wanting to shut down the Message Listening sets the message
listener to NULL, the setMessageListener call will not return until there is
no onMessage function in progress?

And so, after the setMessageListener(NULL) call returns, it is safe to
delete the MessageListener object? 

Steve Rice



--
View this message in context: http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939p4690057.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: how to safely delete a cms::MessageListener object

Posted by Timothy Bish <ta...@gmail.com>.
On 01/15/2015 09:16 PM, artnaseef wrote:
> How about the following instead?  As Tim pointed out, the consumer close
> won't return until no more calls to onMessage are guaranteed.  I believe
> that also means it won't return while onMessage() is actively being called.
>
> @Tim - can you confirm?

It should be the case that onMessage must always complete prior to 
close() returning, unless you do something silly like call close from 
onMessage.  There are two locks at play here that prevent things from 
going wrong, one on the listener and one on the dispatch queue.  This is 
why it is also safe to just call setMessageListener(null) on the 
consumer as it will also not return until no onMessage calls are in 
progress.  During the time there is no set message listener the messages 
will pile up in the prefetch queue and dispatch would start again once a 
new listener is added.

>
> If that's correct, the following should do the job effectively:
>
> ON MESSAGE
>      LOCK semaphore
>      IF shutdown started
>      THEN
>         ABORT
>      END IF
>
>      SET processing active
>      UNLOCK semaphore
>
>      DO process message
>
>      LOCK semaphore
>        SET processing NOT active
>      UNLOCK semaphore
>
>
> ON LISTENER SHUTDOWN
>      LOCK semaphore
>        SET shutdown started
>      UNLOCK semaphore
>
>      DO close consumer
>
>      SET not done
>      WHILE not done
>        LOCK semaphore
>          IF processing NOT active
>          THEN
>            SET done
>          END IF
>        UNLOCK semaphore
>
>        IF NOT done
>        THEN
>          DELAY // short sleep; note there are methods to eliminate the sleep
> and use a notification; I don't recall them for C++ right now
>        END IF
>      END WHILE
>
>      DO delete message listener
>
>
> That last part about the delay can be solved with another semaphore as well.
> Also, if the part up to "ABORT" in the listener can be done safely even
> after the listener is deleted, then this works even if the consumer close
> can return while the call to onMessage() is still active.
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939p4690003.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>


-- 
Tim Bish
Sr Software Engineer | RedHat Inc.
tim.bish@redhat.com | www.redhat.com
skype: tabish121 | twitter: @tabish121
blog: http://timbish.blogspot.com/


Re: how to safely delete a cms::MessageListener object

Posted by artnaseef <ar...@artnaseef.com>.
How about the following instead?  As Tim pointed out, the consumer close
won't return until no more calls to onMessage are guaranteed.  I believe
that also means it won't return while onMessage() is actively being called.

@Tim - can you confirm?

If that's correct, the following should do the job effectively:

ON MESSAGE
    LOCK semaphore
    IF shutdown started
    THEN
       ABORT
    END IF

    SET processing active
    UNLOCK semaphore

    DO process message

    LOCK semaphore
      SET processing NOT active
    UNLOCK semaphore


ON LISTENER SHUTDOWN
    LOCK semaphore
      SET shutdown started
    UNLOCK semaphore

    DO close consumer

    SET not done
    WHILE not done
      LOCK semaphore
        IF processing NOT active
        THEN
          SET done
        END IF
      UNLOCK semaphore

      IF NOT done
      THEN
        DELAY // short sleep; note there are methods to eliminate the sleep
and use a notification; I don't recall them for C++ right now
      END IF
    END WHILE

    DO delete message listener


That last part about the delay can be solved with another semaphore as well. 
Also, if the part up to "ABORT" in the listener can be done safely even
after the listener is deleted, then this works even if the consumer close
can return while the call to onMessage() is still active.



--
View this message in context: http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939p4690003.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: how to safely delete a cms::MessageListener object

Posted by Steve Rice <st...@teamquest.com>.
Thanks.  I considered the sleep idea and will probably go that route.

Steve Rice



--
View this message in context: http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939p4689997.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: how to safely delete a cms::MessageListener object

Posted by Tim Bain <tb...@alumni.duke.edu>.
You could probably handle that problem in most cases by adding a flag that
says whether onMessage() is currently running (increment on entry,
decrement on exit) combined with a long enough sleep() call; it's a bit
ham-fisted, but it should be good enough.  Close the consumer (so you know
that there is at most one more message), sleep for your amount of time,
then check whether onMessage() is still running and sleep until it's not
(and maybe a bit more for good measure to make sure that the code that
calls onMessage() gets a chance to finish, too), then delete the listener
object.  The only risk there is if you don't sleep long enough initially
and there's still a message waiting to have onMessage() called, but
hopefully you can find a duration that reduces that risk to close to zero
even if you can't entirely prevent the issue.

A better solution would be to have a method (isSafeToDelete()?) on the
listener that will return false until the listener has been closed and the
last message has been processed and the thread has exited.  That sounds
like a useful enhancement you could request in JIRA...

On Thu, Jan 15, 2015 at 9:40 AM, Steve Rice <st...@teamquest.com>
wrote:

> Thanks for the suggestions.  I have a request-and-reply mechanism, where
> the
> reply arrives asynchronously to a temporary queue, and a consumer has been
> set up to listen for that reply sent to the temporary queue.  At some
> point,
> after waiting awhile to receive the reply and still not getting it, the
> main
> thread may decide that it has given up on receiving the reply and wants to
> cleanup the objects that were set up to receive the reply.  It appears I
> can
> set the message listener associated with the consumer to NULL (or close the
> consumer), which will prevent an incoming reply from calling the
> onMessage()
> function associated with message listener.  That stops the listening.  The
> case I am concerned about is when the onMessage() function has begun
> executing to process a reply, but has not yet reached a statement where I
> can set a flag to indicate to the main thread that it is executing.  At
> that
> moment, the main thread does not know that there is a concurrent execution
> of onMessage() and may proceed with de-allocating objects used by
> onMessage().  Perhaps this window of vulnerability can be closed by using
> C++11 shared pointers, but that would require modifying the CMS code.
>
> Steve Rice
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939p4689955.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: how to safely delete a cms::MessageListener object

Posted by Steve Rice <st...@teamquest.com>.
Thanks for the suggestions.  I have a request-and-reply mechanism, where the
reply arrives asynchronously to a temporary queue, and a consumer has been
set up to listen for that reply sent to the temporary queue.  At some point,
after waiting awhile to receive the reply and still not getting it, the main
thread may decide that it has given up on receiving the reply and wants to
cleanup the objects that were set up to receive the reply.  It appears I can
set the message listener associated with the consumer to NULL (or close the
consumer), which will prevent an incoming reply from calling the onMessage()
function associated with message listener.  That stops the listening.  The
case I am concerned about is when the onMessage() function has begun
executing to process a reply, but has not yet reached a statement where I
can set a flag to indicate to the main thread that it is executing.  At that
moment, the main thread does not know that there is a concurrent execution
of onMessage() and may proceed with de-allocating objects used by
onMessage().  Perhaps this window of vulnerability can be closed by using
C++11 shared pointers, but that would require modifying the CMS code.

Steve Rice



--
View this message in context: http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939p4689955.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: how to safely delete a cms::MessageListener object

Posted by artnaseef <ar...@artnaseef.com>.
Good question.  In java, we don't need to worry about the memory allocation -
just the concurrency.  While I don't know the answer, I'll give some input
here.

Have you looked through the CMS code?  Some things that may work:

* Closing the consumer; once the close() method returns, I would not expect
to receive any more messages
* Closing the session or connection for the consumer (also expecting no more
messages after close() completes)
* Set a new message listener on the consumer (expecting no more messages on
the old after the new is replaced)

By the way, frequently creating and removing consumers can get expensive
with ActiveMQ, especially in a network of brokers.  So, I recommend
revisiting the design to see if there's a way to avoid doing so.  For
example, if waiting for responses on a shared queue using correlation IDs, a
good approach is to instead have a queue dedicated to each server, have one
permanent consumer on each server, and have a register-and-dispatch from
that consumer to the internal recipient.

Hope this helps.



--
View this message in context: http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939p4689951.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: how to safely delete a cms::MessageListener object

Posted by Steve Rice <st...@teamquest.com>.
Awesome.  Thanks, Tim.

Steve Rice



--
View this message in context: http://activemq.2283324.n4.nabble.com/how-to-safely-delete-a-cms-MessageListener-object-tp4689939p4690062.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.