You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by RickW <ri...@in-deptheng.com> on 2009/04/14 19:15:27 UTC

Interupting SubscriptionManager::run()

What is the standard way for using QPID with other event-driven tools that want to control the dispatch loop (like Xt and Qt)?

I was thinking of calling SubscriptionManager::run() periodically. I am considering something like this. Can stop() be called in another thread like this? (I.E. not in a listener) Is there a better way?

void *waitThread(void *arg);

void processQpidMessageForAWhile()
{
  pthread_t thread;
  pthread_create(&thread, NULL, waitThread, NULL);
  mySubscriptionManager.run();
}

void *waitThread(void *arg)
{
   usleep(/* some interval */);
   mySubscriptionManager.stop();
}

Thanks in advance,
Rick
-- 
View this message in context: http://n2.nabble.com/Interupting-SubscriptionManager%3A%3Arun%28%29-tp2634272p2634272.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Interupting SubscriptionManager::run()

Posted by Gordon Sim <gs...@redhat.com>.
Alan Conway wrote:
> Gordon Sim wrote:
>> RickW wrote:
>>> Follow up to the thread:
>>>
>>> We now use QPID two ways. (1) Message-driven programs willing to 
>>> relinquish
>>> control permanently to QPID call SubscriptionManager::run(). (2) 
>>> Programs
>>> that are not totally message-driven (those needing non-message events 
>>> like
>>> timers or operator input) use LocalQueues. Their main loop checks for 
>>> other
>>> events and then check the LocalQueues for messages.
>>>
>>> We have been unable to make the non-message-driven programs 
>>> efficient. The
>>> main loop burns 100% CPU unless we add sleeps, which means we are doing
>>> nothing when there could be messages to process.
>>>
> 
> Are you using a 0 timeout on LocalQueue::get? If you use a non-0 timeout 
> then qpid shouldn't consume CPU while waiting for a message on a 
> LocalQueue.
> 
>>> We really need something like a SubscriptionManager::runTimed(interval)
>>> call. It would process messages  (or block if none is available) for the
>>> specified interval and then return. Is there some way to do this?
>>
> 
> A loop with LocalQueue::get and a timeout should give you a similar result.

Thats fine if you only have one subscription. If not you have to either 
spin checking separate queues or block on one for a time (which may 
reduce the responsiveness to message delivery on another queue).

Allowing the same LocalQueue to be used for multiple subscriptions would 
be an alternative solution to the problem I believe.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Interupting SubscriptionManager::run()

Posted by Alan Conway <ac...@redhat.com>.
Gordon Sim wrote:
> RickW wrote:
>> Follow up to the thread:
>>
>> We now use QPID two ways. (1) Message-driven programs willing to 
>> relinquish
>> control permanently to QPID call SubscriptionManager::run(). (2) Programs
>> that are not totally message-driven (those needing non-message events 
>> like
>> timers or operator input) use LocalQueues. Their main loop checks for 
>> other
>> events and then check the LocalQueues for messages.
>>
>> We have been unable to make the non-message-driven programs efficient. 
>> The
>> main loop burns 100% CPU unless we add sleeps, which means we are doing
>> nothing when there could be messages to process.
>>

Are you using a 0 timeout on LocalQueue::get? If you use a non-0 timeout then 
qpid shouldn't consume CPU while waiting for a message on a LocalQueue.

>> We really need something like a SubscriptionManager::runTimed(interval)
>> call. It would process messages  (or block if none is available) for the
>> specified interval and then return. Is there some way to do this?
> 

A loop with LocalQueue::get and a timeout should give you a similar result.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Interupting SubscriptionManager::run()

Posted by Gordon Sim <gs...@redhat.com>.
RickW wrote:
> Follow up to the thread:
> 
> We now use QPID two ways. (1) Message-driven programs willing to relinquish
> control permanently to QPID call SubscriptionManager::run(). (2) Programs
> that are not totally message-driven (those needing non-message events like
> timers or operator input) use LocalQueues. Their main loop checks for other
> events and then check the LocalQueues for messages.
> 
> We have been unable to make the non-message-driven programs efficient. The
> main loop burns 100% CPU unless we add sleeps, which means we are doing
> nothing when there could be messages to process.
> 
> We really need something like a SubscriptionManager::runTimed(interval)
> call. It would process messages  (or block if none is available) for the
> specified interval and then return. Is there some way to do this?

Seems like a reasonable enhancement request (and alternative request 
would be to allow a single LocalQueue to be used for multiple 
subscriptions). If you want to raise a Jira, we can then figure out the 
best fix.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Interupting SubscriptionManager::run()

Posted by RickW <ri...@in-deptheng.com>.
Follow up to the thread:

We now use QPID two ways. (1) Message-driven programs willing to relinquish
control permanently to QPID call SubscriptionManager::run(). (2) Programs
that are not totally message-driven (those needing non-message events like
timers or operator input) use LocalQueues. Their main loop checks for other
events and then check the LocalQueues for messages.

We have been unable to make the non-message-driven programs efficient. The
main loop burns 100% CPU unless we add sleeps, which means we are doing
nothing when there could be messages to process.

We really need something like a SubscriptionManager::runTimed(interval)
call. It would process messages  (or block if none is available) for the
specified interval and then return. Is there some way to do this?

Rick
-- 
View this message in context: http://n2.nabble.com/Interupting-SubscriptionManager%3A%3Arun%28%29-tp2634272p3205915.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Interupting SubscriptionManager::run()

Posted by RickW <ri...@in-deptheng.com>.
It turns out this:


void *waitThread(void *arg);

void processQpidMessageForAWhile()
{
  pthread_t thread;
  pthread_create(&thread, NULL, waitThread, NULL);
  mySubscriptionManager.run();
  pthread_join(thread, NULL);
}

void *waitThread(void *arg)
{
   usleep(/* some interval */);
   mySubscriptionManager.stop();
} 


only works once. After calling stop() once, subsequent calls to run() do nothing. (stop() "closes" the queue, whatever that means). Unless you all consider that a bug that will eventually be fixed I'm going to pursue Adam's suggestion of using LocalQueues.
-- 
View this message in context: http://n2.nabble.com/Interupting-SubscriptionManager%3A%3Arun%28%29-tp2634272p2642900.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Interupting SubscriptionManager::run()

Posted by RickW <ri...@in-deptheng.com>.

Yes it would work, stop can be called from any thread. It's a bit clunky
though. What would the ideal API for your task look like? A "doWork()"
function that does a bounded amount of work rather than blocking like
run()?


Yes. The API function I am trying to implement is

   processMessagesTimed(int milliseconds)

It processes messages for the specified interval and returns. So a runFor(interval) would be ideal for me. But any version of run() that would eventually return would be OK, e.g. processAllOutstandingEvents(), processOneEvent(). These are probably more generally useful.


 A file descriptor that could be polled for qpid activity? 


That would be nice, but probably overkill and non-portable. Just a boolean function to tell me there is work to do would be helpful.
-- 
View this message in context: http://n2.nabble.com/Interupting-SubscriptionManager%3A%3Arun%28%29-tp2634272p2635706.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Interupting SubscriptionManager::run()

Posted by Alan Conway <ac...@redhat.com>.
On Tue, 2009-04-14 at 13:41 -0700, RickW wrote:
> 
> Why don't you just have the subscription manager running in its own
> thread?  I think that's the way it was designed to run.  Depending on
> what you do in your listener, you might need to use some locks.
> 
> 
> I don't think that will work for me. QPID is a replacement back-end for an existing messaging API. Not all users of the existing API can tolerate listeners called on a different thread.
> 
> 
> Or you could use LocalQueues which allow you to call get and they will
> take care of the synchronization.
> 
> 
> This will work, but I think it means I have to do my own message routing. (Read from the LocalQueues, figure out who wanted the message and call their listener(s).) This still may be my best option.
> 
> Can several queues feed the same LocalQueue?

No unfortunately. This would be a good improvement.

> Would my approach have worked? Or does stop() have to be called from listener thread?

Yes it would work, stop can be called from any thread. It's a bit clunky
though. What would the ideal API for your task look like? A "doWork()"
function that does a bounded amount of work rather than blocking like
run()? A file descriptor that could be polled for qpid activity? 

There's some work going on to define a more user-friendly API, I'd like
to address our threading  model as part of it.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Interupting SubscriptionManager::run()

Posted by RickW <ri...@in-deptheng.com>.

Why don't you just have the subscription manager running in its own
thread?  I think that's the way it was designed to run.  Depending on
what you do in your listener, you might need to use some locks.


I don't think that will work for me. QPID is a replacement back-end for an existing messaging API. Not all users of the existing API can tolerate listeners called on a different thread.


Or you could use LocalQueues which allow you to call get and they will
take care of the synchronization.


This will work, but I think it means I have to do my own message routing. (Read from the LocalQueues, figure out who wanted the message and call their listener(s).) This still may be my best option.

Can several queues feed the same LocalQueue?

Would my approach have worked? Or does stop() have to be called from listener thread?
-- 
View this message in context: http://n2.nabble.com/Interupting-SubscriptionManager%3A%3Arun%28%29-tp2634272p2635248.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Interupting SubscriptionManager::run()

Posted by Adam Chase <ad...@gmail.com>.
Why don't you just have the subscription manager running in its own
thread?  I think that's the way it was designed to run.  Depending on
what you do in your listener, you might need to use some locks.  Or
you could use LocalQueues which allow you to call get and they will
take care of the synchronization.

Adam

On 4/14/09, RickW <ri...@in-deptheng.com> wrote:
>
> What is the standard way for using QPID with other event-driven tools that
> want to control the dispatch loop (like Xt and Qt)?
>
> I was thinking of calling SubscriptionManager::run() periodically. I am
> considering something like this. Can stop() be called in another thread like
> this? (I.E. not in a listener) Is there a better way?
>
> void *waitThread(void *arg);
>
> void processQpidMessageForAWhile()
> {
>   pthread_t thread;
>   pthread_create(&thread, NULL, waitThread, NULL);
>   mySubscriptionManager.run();
> }
>
> void *waitThread(void *arg)
> {
>    usleep(/* some interval */);
>    mySubscriptionManager.stop();
> }
>
> Thanks in advance,
> Rick
> --
> View this message in context:
> http://n2.nabble.com/Interupting-SubscriptionManager%3A%3Arun%28%29-tp2634272p2634272.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:users-subscribe@qpid.apache.org
>
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org