You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by mz...@domdv.de on 2011/03/04 12:19:17 UTC
AMQ-CMS - questions about threads/producerthreads in AMQ
Hello there,
greetings from Germany. ;)
I'm currently trying to work myself into AMQ. I already tried several
approches to the producer thread question and have searched the web and the
forum. I'm also a beginner to the multithreading theme.
The situation:
I would like my application to create a consumer and a producer, which is no
problem. But both should live forever.
The consumer sleeps via a Countdownlatch, which will never be count down,
and wakes up via a Messagelistener, when a message is recieved.
The producer is my main problem. The application tells me to send a message.
Now i want to tell my producer thread to send this message syncron with
failure handling etc, while my application runs on. How do i tell the thread
to wait for a message? How do I tell the threads to live forever?
I already thought about using mutexes, semaphores, signals etc. But i found
out that the decaf threads already use pthread internally. So i guess there
exists functions for my problems, that i haven't found so far.
Propably my first trys do something which is way to complex, where there
already exists methods or wrapper:
I already tried to use signals, but the signal doesn't seems work with the
threadID i get from the decaf thread. Eg:
Application:
this->consumerThread = new Thread(&cmsConsAsync);
this->consumerThread->start();
cmsConsAsync.waitUntilReady();
this->producerThread = new Thread(&cmsProdAsync);
this->producerThread->start();
cmsProdAsync.waitUntilReady();
this->prodThreadID = this->producerThread->getId();
printf("PID: %lld\n", this->prodThreadID);
printf("State: %d\n", this->producerThread->getState());
if (this->producerThread->isAlive())
{
printf("IsAlive\n");
kill(this->prodThreadID, SIGUSR1);
printf("IsAlive2\n");
}
else
printf("I'm killed!\n");
The Producer:
/* set function calls */
signal(SIGUSR1, CMSProducer::sendTextMessage);
signal(SIGUSR2, CMSProducer::sendObjMessage);
signal(SIGQUIT, CMSProducer::shutdown);
//let caller run on
latch.countDown();
//set to sleep, will wake up on signal calls
//waitLatch.await();
while(CMSProducer::noQuit)
{
printf("PAUSE!\n");
pause();
}
Is there a better way to do something like this?
Best regards,
amqBeginner
Re: AMQ-CMS - questions about threads/producerthreads in AMQ
Posted by Timothy Bish <ta...@fusesource.com>.
On Fri, 2011-03-04 at 12:46 +0100, mzo@domdv.de wrote:
> Thanks for the fast reply.
>
> I already tested a bit with the example. I also think i know how esb and
> amq work basically.
>
> The problem is this:
>
> 1.) How can I create a Producer-Thread which sleeps until my programm
> wants to send a message? And how do I call the function which sends the
> message? I tried to do this with sleep() and signaling. But since the
> threadID i get is lld and not d it seems i cant send a signal to the
> thread. How is this usually done?
This is commonly done using a Blocking Queue, the producer thread waits
on something being enqueued and once it is, it dequeues it and does
whatever work is needed.
>
> 2.) As far as i understood, a thread which also derives from
> Messagelistener, will sleep if i call "latch.await();". And it will wake
> up if a message is on the topic. Does it goes to sleep afterwards?
>
Creating a Consumer object and registering a MessageListener will result
in messages being received in a separate thread, so there's not really
any need to create a new thread for your consumer.
> The problem is that I want to send async messages, which can make their
> retry and so on, without blocking my system.
> If I use the normal async handling, messages can get lost, which should
> never happen for my program. But normal syncron messages would block it. I
> guess I'm not the first person with such needs. What is usually done in
> such cases?
>
> So i thought that a thread sends the message for me and persists it.
>
> I have, up to the moment, little trouble in developing syncronous messages
> between my 2 components. But I would be rellay grateful if someone could
> point me to the starting point for async-message handling like I need it.
>
> I hope I could describe my problems good enough so that they are
> understandable.
>
> Best regards,
> amqBeginner
>
> > I think that if you go to section CMS-API-Overview
> >
> > http://activemq.apache.org/cms/cms-api-overview.html
> >
> >
> > <http://activemq.apache.org/cms/example.html>At the end, you could see
> > something that is more addaptable for that you want. I think ;-)
> >
> >
> > regards
> >
> > 2011/3/4 <mz...@domdv.de>
> >
> >> Hello there,
> >>
> >> greetings from Germany. ;)
> >>
> >> I'm currently trying to work myself into AMQ. I already tried several
> >> approches to the producer thread question and have searched the web and
> >> the
> >> forum. I'm also a beginner to the multithreading theme.
> >>
> >> The situation:
> >> I would like my application to create a consumer and a producer, which
> >> is
> >> no
> >> problem. But both should live forever.
> >> The consumer sleeps via a Countdownlatch, which will never be count
> >> down,
> >> and wakes up via a Messagelistener, when a message is recieved.
> >> The producer is my main problem. The application tells me to send a
> >> message.
> >> Now i want to tell my producer thread to send this message syncron with
> >> failure handling etc, while my application runs on. How do i tell the
> >> thread
> >> to wait for a message? How do I tell the threads to live forever?
> >>
> >> I already thought about using mutexes, semaphores, signals etc. But i
> >> found
> >> out that the decaf threads already use pthread internally. So i guess
> >> there
> >> exists functions for my problems, that i haven't found so far.
> >>
> >> Propably my first trys do something which is way to complex, where there
> >> already exists methods or wrapper:
> >>
> >> I already tried to use signals, but the signal doesn't seems work with
> >> the
> >> threadID i get from the decaf thread. Eg:
> >>
> >> Application:
> >>
> >> this->consumerThread = new Thread(&cmsConsAsync);
> >> this->consumerThread->start();
> >> cmsConsAsync.waitUntilReady();
> >> this->producerThread = new Thread(&cmsProdAsync);
> >> this->producerThread->start();
> >> cmsProdAsync.waitUntilReady();
> >> this->prodThreadID = this->producerThread->getId();
> >> printf("PID: %lld\n", this->prodThreadID);
> >> printf("State: %d\n", this->producerThread->getState());
> >> if (this->producerThread->isAlive())
> >> {
> >> printf("IsAlive\n");
> >> kill(this->prodThreadID, SIGUSR1);
> >> printf("IsAlive2\n");
> >> }
> >> else
> >> printf("I'm killed!\n");
> >>
> >> The Producer:
> >>
> >> /* set function calls */
> >> signal(SIGUSR1, CMSProducer::sendTextMessage);
> >> signal(SIGUSR2, CMSProducer::sendObjMessage);
> >> signal(SIGQUIT, CMSProducer::shutdown);
> >>
> >> //let caller run on
> >> latch.countDown();
> >>
> >> //set to sleep, will wake up on signal calls
> >> //waitLatch.await();
> >> while(CMSProducer::noQuit)
> >> {
> >> printf("PAUSE!\n");
> >> pause();
> >> }
> >>
> >> Is there a better way to do something like this?
> >>
> >> Best regards,
> >> amqBeginner
> >>
> >>
> >
> >
> > --
> > Óscar Pernas Plaza.
> >
>
>
>
>
--
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/
Re: AMQ-CMS - questions about threads/producerthreads in AMQ
Posted by mz...@domdv.de.
Thanks for the fast reply.
I already tested a bit with the example. I also think i know how esb and
amq work basically.
The problem is this:
1.) How can I create a Producer-Thread which sleeps until my programm
wants to send a message? And how do I call the function which sends the
message? I tried to do this with sleep() and signaling. But since the
threadID i get is lld and not d it seems i cant send a signal to the
thread. How is this usually done?
2.) As far as i understood, a thread which also derives from
Messagelistener, will sleep if i call "latch.await();". And it will wake
up if a message is on the topic. Does it goes to sleep afterwards?
The problem is that I want to send async messages, which can make their
retry and so on, without blocking my system.
If I use the normal async handling, messages can get lost, which should
never happen for my program. But normal syncron messages would block it. I
guess I'm not the first person with such needs. What is usually done in
such cases?
So i thought that a thread sends the message for me and persists it.
I have, up to the moment, little trouble in developing syncronous messages
between my 2 components. But I would be rellay grateful if someone could
point me to the starting point for async-message handling like I need it.
I hope I could describe my problems good enough so that they are
understandable.
Best regards,
amqBeginner
> I think that if you go to section CMS-API-Overview
>
> http://activemq.apache.org/cms/cms-api-overview.html
>
>
> <http://activemq.apache.org/cms/example.html>At the end, you could see
> something that is more addaptable for that you want. I think ;-)
>
>
> regards
>
> 2011/3/4 <mz...@domdv.de>
>
>> Hello there,
>>
>> greetings from Germany. ;)
>>
>> I'm currently trying to work myself into AMQ. I already tried several
>> approches to the producer thread question and have searched the web and
>> the
>> forum. I'm also a beginner to the multithreading theme.
>>
>> The situation:
>> I would like my application to create a consumer and a producer, which
>> is
>> no
>> problem. But both should live forever.
>> The consumer sleeps via a Countdownlatch, which will never be count
>> down,
>> and wakes up via a Messagelistener, when a message is recieved.
>> The producer is my main problem. The application tells me to send a
>> message.
>> Now i want to tell my producer thread to send this message syncron with
>> failure handling etc, while my application runs on. How do i tell the
>> thread
>> to wait for a message? How do I tell the threads to live forever?
>>
>> I already thought about using mutexes, semaphores, signals etc. But i
>> found
>> out that the decaf threads already use pthread internally. So i guess
>> there
>> exists functions for my problems, that i haven't found so far.
>>
>> Propably my first trys do something which is way to complex, where there
>> already exists methods or wrapper:
>>
>> I already tried to use signals, but the signal doesn't seems work with
>> the
>> threadID i get from the decaf thread. Eg:
>>
>> Application:
>>
>> this->consumerThread = new Thread(&cmsConsAsync);
>> this->consumerThread->start();
>> cmsConsAsync.waitUntilReady();
>> this->producerThread = new Thread(&cmsProdAsync);
>> this->producerThread->start();
>> cmsProdAsync.waitUntilReady();
>> this->prodThreadID = this->producerThread->getId();
>> printf("PID: %lld\n", this->prodThreadID);
>> printf("State: %d\n", this->producerThread->getState());
>> if (this->producerThread->isAlive())
>> {
>> printf("IsAlive\n");
>> kill(this->prodThreadID, SIGUSR1);
>> printf("IsAlive2\n");
>> }
>> else
>> printf("I'm killed!\n");
>>
>> The Producer:
>>
>> /* set function calls */
>> signal(SIGUSR1, CMSProducer::sendTextMessage);
>> signal(SIGUSR2, CMSProducer::sendObjMessage);
>> signal(SIGQUIT, CMSProducer::shutdown);
>>
>> //let caller run on
>> latch.countDown();
>>
>> //set to sleep, will wake up on signal calls
>> //waitLatch.await();
>> while(CMSProducer::noQuit)
>> {
>> printf("PAUSE!\n");
>> pause();
>> }
>>
>> Is there a better way to do something like this?
>>
>> Best regards,
>> amqBeginner
>>
>>
>
>
> --
> Óscar Pernas Plaza.
>
Re: AMQ-CMS - questions about threads/producerthreads in AMQ
Posted by Oscar Pernas <os...@pernas.es>.
Hi mzo,
To do what you want to do I did a conditional mutex over a concurrent access
queue.
If you want to take a look of my code:
https://code.google.com/p/activeinterface/source/browse/#svn%2FActiveInterface%2Fsrc%2Fcore%2Fwrapper
<https://code.google.com/p/activeinterface/source/browse/#svn%2FActiveInterface%2Fsrc%2Fcore%2Fwrapper>take
a look of the ActiveProducer class, that has a ActiveProducerThread class.
regards
2011/3/7 Timothy Bish <ta...@fusesource.com>
> On Mon, 2011-03-07 at 09:28 +0100, mzo@domdv.de wrote:
> > Hello Timothy,
> >
> > > Creating a Consumer object and registering a MessageListener will
> result
> > > in messages being received in a separate thread, so there's not really
> > > any need to create a new thread for your consumer.
> >
> > That explains the count of threads upon debugging. I already thought that
> > may be the case, but its good to have it confirmed.
> >
> > > This is commonly done using a Blocking Queue, the producer thread waits
> > > on something being enqueued and once it is, it dequeues it and does
> > > whatever work is needed.
> >
> > But with a queue there would be a problem that if a subscriber gets
> > offline due networking reasons, the messages would stack until a memory
> > error occurs, or?
> >
> > And with a "Blocking Queue" you mean simply a sync Queue or? Are there
> > samples for this?
>
> A sync Q yes. You look at the MessageDispatchChannel classes in the src
> bundle for an idea of how you might implement something that fits your
> requirements.
>
> Regards
>
> --
> Tim Bish
> ------------
> FuseSource
> Email: tim.bish@fusesource.com
> Web: http://fusesource.com
> Twitter: tabish121
> Blog: http://timbish.blogspot.com/
>
>
>
--
Óscar Pernas Plaza.
Re: AMQ-CMS - questions about threads/producerthreads in AMQ
Posted by Timothy Bish <ta...@fusesource.com>.
On Mon, 2011-03-07 at 09:28 +0100, mzo@domdv.de wrote:
> Hello Timothy,
>
> > Creating a Consumer object and registering a MessageListener will result
> > in messages being received in a separate thread, so there's not really
> > any need to create a new thread for your consumer.
>
> That explains the count of threads upon debugging. I already thought that
> may be the case, but its good to have it confirmed.
>
> > This is commonly done using a Blocking Queue, the producer thread waits
> > on something being enqueued and once it is, it dequeues it and does
> > whatever work is needed.
>
> But with a queue there would be a problem that if a subscriber gets
> offline due networking reasons, the messages would stack until a memory
> error occurs, or?
>
> And with a "Blocking Queue" you mean simply a sync Queue or? Are there
> samples for this?
A sync Q yes. You look at the MessageDispatchChannel classes in the src
bundle for an idea of how you might implement something that fits your
requirements.
Regards
--
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/
Re: AMQ-CMS - questions about threads/producerthreads in AMQ
Posted by mz...@domdv.de.
Hello Timothy,
> Creating a Consumer object and registering a MessageListener will result
> in messages being received in a separate thread, so there's not really
> any need to create a new thread for your consumer.
That explains the count of threads upon debugging. I already thought that
may be the case, but its good to have it confirmed.
> This is commonly done using a Blocking Queue, the producer thread waits
> on something being enqueued and once it is, it dequeues it and does
> whatever work is needed.
But with a queue there would be a problem that if a subscriber gets
offline due networking reasons, the messages would stack until a memory
error occurs, or?
And with a "Blocking Queue" you mean simply a sync Queue or? Are there
samples for this?
Thanks for the help!
Best regards,
amqBeginner
Re: AMQ-CMS - questions about threads/producerthreads in AMQ
Posted by mz...@domdv.de.
Hello Timothy, hello Óscar,
I would like to thank both of you for giving me some new insights to AMQ.
I think I got a good clue on how to go on.
@Oscar: Yeah, your sample is around to the thing I want to do. Thanks very
much. I can now better understand the mutexes used with AMQ. And what kind
of mutexes I must use.
Thanks again!
Best regards,
amqBeginner
Re: AMQ-CMS - questions about threads/producerthreads in AMQ
Posted by Oscar Pernas <os...@pernas.es>.
I think that if you go to section CMS-API-Overview
http://activemq.apache.org/cms/cms-api-overview.html
<http://activemq.apache.org/cms/example.html>At the end, you could see
something that is more addaptable for that you want. I think ;-)
regards
2011/3/4 <mz...@domdv.de>
> Hello there,
>
> greetings from Germany. ;)
>
> I'm currently trying to work myself into AMQ. I already tried several
> approches to the producer thread question and have searched the web and the
> forum. I'm also a beginner to the multithreading theme.
>
> The situation:
> I would like my application to create a consumer and a producer, which is
> no
> problem. But both should live forever.
> The consumer sleeps via a Countdownlatch, which will never be count down,
> and wakes up via a Messagelistener, when a message is recieved.
> The producer is my main problem. The application tells me to send a
> message.
> Now i want to tell my producer thread to send this message syncron with
> failure handling etc, while my application runs on. How do i tell the
> thread
> to wait for a message? How do I tell the threads to live forever?
>
> I already thought about using mutexes, semaphores, signals etc. But i found
> out that the decaf threads already use pthread internally. So i guess there
> exists functions for my problems, that i haven't found so far.
>
> Propably my first trys do something which is way to complex, where there
> already exists methods or wrapper:
>
> I already tried to use signals, but the signal doesn't seems work with the
> threadID i get from the decaf thread. Eg:
>
> Application:
>
> this->consumerThread = new Thread(&cmsConsAsync);
> this->consumerThread->start();
> cmsConsAsync.waitUntilReady();
> this->producerThread = new Thread(&cmsProdAsync);
> this->producerThread->start();
> cmsProdAsync.waitUntilReady();
> this->prodThreadID = this->producerThread->getId();
> printf("PID: %lld\n", this->prodThreadID);
> printf("State: %d\n", this->producerThread->getState());
> if (this->producerThread->isAlive())
> {
> printf("IsAlive\n");
> kill(this->prodThreadID, SIGUSR1);
> printf("IsAlive2\n");
> }
> else
> printf("I'm killed!\n");
>
> The Producer:
>
> /* set function calls */
> signal(SIGUSR1, CMSProducer::sendTextMessage);
> signal(SIGUSR2, CMSProducer::sendObjMessage);
> signal(SIGQUIT, CMSProducer::shutdown);
>
> //let caller run on
> latch.countDown();
>
> //set to sleep, will wake up on signal calls
> //waitLatch.await();
> while(CMSProducer::noQuit)
> {
> printf("PAUSE!\n");
> pause();
> }
>
> Is there a better way to do something like this?
>
> Best regards,
> amqBeginner
>
>
--
Óscar Pernas Plaza.