You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by mz...@domdv.de on 2011/03/04 12:19:17 UTC

AMQ-CMS - questions about threads/producerthreads in AMQ

Hello there,

greetings from Germany. ;)

I'm currently trying to work myself into AMQ. I already tried several
approches to the producer thread question and have searched the web and the
forum. I'm also a beginner to the multithreading theme.

The situation:
I would like my application to create a consumer and a producer, which is no
problem. But both should live forever.
The consumer sleeps via a Countdownlatch, which will never be count down,
and wakes up via a Messagelistener, when a message is recieved.
The producer is my main problem. The application tells me to send a message.
Now i want to tell my producer thread to send this message syncron with
failure handling etc, while my application runs on. How do i tell the thread
to wait for a message? How do I tell the threads to live forever?

I already thought about using mutexes, semaphores, signals etc. But i found
out that the decaf threads already use pthread internally. So i guess there
exists functions for my problems, that i haven't found so far.

Propably my first trys do something which is way to complex, where there
already exists methods or wrapper:

I already tried to use signals, but the signal doesn't seems work with the
threadID i get from the decaf thread. Eg:

Application:

this->consumerThread = new Thread(&cmsConsAsync);
this->consumerThread->start();
cmsConsAsync.waitUntilReady();
this->producerThread = new Thread(&cmsProdAsync);
this->producerThread->start();
cmsProdAsync.waitUntilReady();
this->prodThreadID = this->producerThread->getId();
printf("PID: %lld\n", this->prodThreadID);
printf("State: %d\n", this->producerThread->getState());
if (this->producerThread->isAlive())
 {
      printf("IsAlive\n");
      kill(this->prodThreadID, SIGUSR1);
      printf("IsAlive2\n");
 }
 else
      printf("I'm killed!\n");

The Producer:

            /* set function calls */
            signal(SIGUSR1, CMSProducer::sendTextMessage);
            signal(SIGUSR2, CMSProducer::sendObjMessage);
            signal(SIGQUIT, CMSProducer::shutdown);

            //let caller run on
            latch.countDown();

            //set to sleep, will wake up on signal calls
            //waitLatch.await();
            while(CMSProducer::noQuit)
            {
                printf("PAUSE!\n");
                pause();
            }

Is there a better way to do something like this?

Best regards,
amqBeginner


Re: AMQ-CMS - questions about threads/producerthreads in AMQ

Posted by Timothy Bish <ta...@fusesource.com>.
On Fri, 2011-03-04 at 12:46 +0100, mzo@domdv.de wrote:
> Thanks for the fast reply.
> 
> I already tested a bit with the example. I also think i know how esb and
> amq work basically.
> 
> The problem is this:
> 
> 1.) How can I create a Producer-Thread which sleeps until my programm
> wants to send a message? And how do I call the function which sends the
> message? I tried to do this with sleep() and signaling. But since the
> threadID i get is lld and not d it seems i cant send a signal to the
> thread. How is this usually done?

This is commonly done using a Blocking Queue, the producer thread waits
on something being enqueued and once it is, it dequeues it and does
whatever work is needed.  

> 
> 2.) As far as i understood, a thread which also derives from
> Messagelistener, will sleep if i call "latch.await();". And it will wake
> up if a message is on the topic. Does it goes to sleep afterwards?
> 

Creating a Consumer object and registering a MessageListener will result
in messages being received in a separate thread, so there's not really
any need to create a new thread for your consumer.

> The problem is that I want to send async messages, which can make their
> retry and so on, without blocking my system.
> If I use the normal async handling, messages can get lost, which should
> never happen for my program. But normal syncron messages would block it. I
> guess I'm not the first person with such needs. What is usually done in
> such cases?
> 
> So i thought that a thread sends the message for me and persists it.
> 
> I have, up to the moment, little trouble in developing syncronous messages
> between my 2 components. But I would be rellay grateful if someone could
> point me to the starting point for async-message handling like I need it.
> 
> I hope I could describe my problems good enough so that they are
> understandable.
> 
> Best regards,
> amqBeginner
> 
> > I think that if you go to section CMS-API-Overview
> >
> > http://activemq.apache.org/cms/cms-api-overview.html
> >
> >
> >  <http://activemq.apache.org/cms/example.html>At the end, you could see
> > something that is more addaptable for that you want. I think ;-)
> >
> >
> > regards
> >
> > 2011/3/4 <mz...@domdv.de>
> >
> >> Hello there,
> >>
> >> greetings from Germany. ;)
> >>
> >> I'm currently trying to work myself into AMQ. I already tried several
> >> approches to the producer thread question and have searched the web and
> >> the
> >> forum. I'm also a beginner to the multithreading theme.
> >>
> >> The situation:
> >> I would like my application to create a consumer and a producer, which
> >> is
> >> no
> >> problem. But both should live forever.
> >> The consumer sleeps via a Countdownlatch, which will never be count
> >> down,
> >> and wakes up via a Messagelistener, when a message is recieved.
> >> The producer is my main problem. The application tells me to send a
> >> message.
> >> Now i want to tell my producer thread to send this message syncron with
> >> failure handling etc, while my application runs on. How do i tell the
> >> thread
> >> to wait for a message? How do I tell the threads to live forever?
> >>
> >> I already thought about using mutexes, semaphores, signals etc. But i
> >> found
> >> out that the decaf threads already use pthread internally. So i guess
> >> there
> >> exists functions for my problems, that i haven't found so far.
> >>
> >> Propably my first trys do something which is way to complex, where there
> >> already exists methods or wrapper:
> >>
> >> I already tried to use signals, but the signal doesn't seems work with
> >> the
> >> threadID i get from the decaf thread. Eg:
> >>
> >> Application:
> >>
> >> this->consumerThread = new Thread(&cmsConsAsync);
> >> this->consumerThread->start();
> >> cmsConsAsync.waitUntilReady();
> >> this->producerThread = new Thread(&cmsProdAsync);
> >> this->producerThread->start();
> >> cmsProdAsync.waitUntilReady();
> >> this->prodThreadID = this->producerThread->getId();
> >> printf("PID: %lld\n", this->prodThreadID);
> >> printf("State: %d\n", this->producerThread->getState());
> >> if (this->producerThread->isAlive())
> >>  {
> >>      printf("IsAlive\n");
> >>      kill(this->prodThreadID, SIGUSR1);
> >>      printf("IsAlive2\n");
> >>  }
> >>  else
> >>      printf("I'm killed!\n");
> >>
> >> The Producer:
> >>
> >>            /* set function calls */
> >>            signal(SIGUSR1, CMSProducer::sendTextMessage);
> >>            signal(SIGUSR2, CMSProducer::sendObjMessage);
> >>            signal(SIGQUIT, CMSProducer::shutdown);
> >>
> >>            //let caller run on
> >>            latch.countDown();
> >>
> >>            //set to sleep, will wake up on signal calls
> >>            //waitLatch.await();
> >>            while(CMSProducer::noQuit)
> >>            {
> >>                printf("PAUSE!\n");
> >>                pause();
> >>            }
> >>
> >> Is there a better way to do something like this?
> >>
> >> Best regards,
> >> amqBeginner
> >>
> >>
> >
> >
> > --
> > Óscar Pernas Plaza.
> >
> 
> 
> 
> 

-- 
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/



Re: AMQ-CMS - questions about threads/producerthreads in AMQ

Posted by mz...@domdv.de.
Thanks for the fast reply.

I already tested a bit with the example. I also think i know how esb and
amq work basically.

The problem is this:

1.) How can I create a Producer-Thread which sleeps until my programm
wants to send a message? And how do I call the function which sends the
message? I tried to do this with sleep() and signaling. But since the
threadID i get is lld and not d it seems i cant send a signal to the
thread. How is this usually done?

2.) As far as i understood, a thread which also derives from
Messagelistener, will sleep if i call "latch.await();". And it will wake
up if a message is on the topic. Does it goes to sleep afterwards?

The problem is that I want to send async messages, which can make their
retry and so on, without blocking my system.
If I use the normal async handling, messages can get lost, which should
never happen for my program. But normal syncron messages would block it. I
guess I'm not the first person with such needs. What is usually done in
such cases?

So i thought that a thread sends the message for me and persists it.

I have, up to the moment, little trouble in developing syncronous messages
between my 2 components. But I would be rellay grateful if someone could
point me to the starting point for async-message handling like I need it.

I hope I could describe my problems good enough so that they are
understandable.

Best regards,
amqBeginner

> I think that if you go to section CMS-API-Overview
>
> http://activemq.apache.org/cms/cms-api-overview.html
>
>
>  <http://activemq.apache.org/cms/example.html>At the end, you could see
> something that is more addaptable for that you want. I think ;-)
>
>
> regards
>
> 2011/3/4 <mz...@domdv.de>
>
>> Hello there,
>>
>> greetings from Germany. ;)
>>
>> I'm currently trying to work myself into AMQ. I already tried several
>> approches to the producer thread question and have searched the web and
>> the
>> forum. I'm also a beginner to the multithreading theme.
>>
>> The situation:
>> I would like my application to create a consumer and a producer, which
>> is
>> no
>> problem. But both should live forever.
>> The consumer sleeps via a Countdownlatch, which will never be count
>> down,
>> and wakes up via a Messagelistener, when a message is recieved.
>> The producer is my main problem. The application tells me to send a
>> message.
>> Now i want to tell my producer thread to send this message syncron with
>> failure handling etc, while my application runs on. How do i tell the
>> thread
>> to wait for a message? How do I tell the threads to live forever?
>>
>> I already thought about using mutexes, semaphores, signals etc. But i
>> found
>> out that the decaf threads already use pthread internally. So i guess
>> there
>> exists functions for my problems, that i haven't found so far.
>>
>> Propably my first trys do something which is way to complex, where there
>> already exists methods or wrapper:
>>
>> I already tried to use signals, but the signal doesn't seems work with
>> the
>> threadID i get from the decaf thread. Eg:
>>
>> Application:
>>
>> this->consumerThread = new Thread(&cmsConsAsync);
>> this->consumerThread->start();
>> cmsConsAsync.waitUntilReady();
>> this->producerThread = new Thread(&cmsProdAsync);
>> this->producerThread->start();
>> cmsProdAsync.waitUntilReady();
>> this->prodThreadID = this->producerThread->getId();
>> printf("PID: %lld\n", this->prodThreadID);
>> printf("State: %d\n", this->producerThread->getState());
>> if (this->producerThread->isAlive())
>>  {
>>      printf("IsAlive\n");
>>      kill(this->prodThreadID, SIGUSR1);
>>      printf("IsAlive2\n");
>>  }
>>  else
>>      printf("I'm killed!\n");
>>
>> The Producer:
>>
>>            /* set function calls */
>>            signal(SIGUSR1, CMSProducer::sendTextMessage);
>>            signal(SIGUSR2, CMSProducer::sendObjMessage);
>>            signal(SIGQUIT, CMSProducer::shutdown);
>>
>>            //let caller run on
>>            latch.countDown();
>>
>>            //set to sleep, will wake up on signal calls
>>            //waitLatch.await();
>>            while(CMSProducer::noQuit)
>>            {
>>                printf("PAUSE!\n");
>>                pause();
>>            }
>>
>> Is there a better way to do something like this?
>>
>> Best regards,
>> amqBeginner
>>
>>
>
>
> --
> Óscar Pernas Plaza.
>





Re: AMQ-CMS - questions about threads/producerthreads in AMQ

Posted by Oscar Pernas <os...@pernas.es>.
Hi mzo,

To do what you want to do I did a conditional mutex over a concurrent access
queue.

If you want to take a look of my code:

https://code.google.com/p/activeinterface/source/browse/#svn%2FActiveInterface%2Fsrc%2Fcore%2Fwrapper

<https://code.google.com/p/activeinterface/source/browse/#svn%2FActiveInterface%2Fsrc%2Fcore%2Fwrapper>take
a look of the ActiveProducer class, that has a ActiveProducerThread class.


regards

2011/3/7 Timothy Bish <ta...@fusesource.com>

> On Mon, 2011-03-07 at 09:28 +0100, mzo@domdv.de wrote:
> > Hello Timothy,
> >
> > > Creating a Consumer object and registering a MessageListener will
> result
> > > in messages being received in a separate thread, so there's not really
> > > any need to create a new thread for your consumer.
> >
> > That explains the count of threads upon debugging. I already thought that
> > may be the case, but its good to have it confirmed.
> >
> > > This is commonly done using a Blocking Queue, the producer thread waits
> > > on something being enqueued and once it is, it dequeues it and does
> > > whatever work is needed.
> >
> > But with a queue there would be a problem that if a subscriber gets
> > offline due networking reasons, the messages would stack until a memory
> > error occurs, or?
> >
> > And with a "Blocking Queue" you mean simply a sync Queue or? Are there
> > samples for this?
>
> A sync Q yes.  You look at the MessageDispatchChannel classes in the src
> bundle for an idea of how you might implement something that fits your
> requirements.
>
> Regards
>
> --
> Tim Bish
> ------------
> FuseSource
> Email: tim.bish@fusesource.com
> Web: http://fusesource.com
> Twitter: tabish121
> Blog: http://timbish.blogspot.com/
>
>
>


-- 
Óscar Pernas Plaza.

Re: AMQ-CMS - questions about threads/producerthreads in AMQ

Posted by Timothy Bish <ta...@fusesource.com>.
On Mon, 2011-03-07 at 09:28 +0100, mzo@domdv.de wrote:
> Hello Timothy,
> 
> > Creating a Consumer object and registering a MessageListener will result
> > in messages being received in a separate thread, so there's not really
> > any need to create a new thread for your consumer.
> 
> That explains the count of threads upon debugging. I already thought that
> may be the case, but its good to have it confirmed.
> 
> > This is commonly done using a Blocking Queue, the producer thread waits
> > on something being enqueued and once it is, it dequeues it and does
> > whatever work is needed.
> 
> But with a queue there would be a problem that if a subscriber gets
> offline due networking reasons, the messages would stack until a memory
> error occurs, or?
> 
> And with a "Blocking Queue" you mean simply a sync Queue or? Are there
> samples for this?

A sync Q yes.  You look at the MessageDispatchChannel classes in the src
bundle for an idea of how you might implement something that fits your
requirements.  

Regards

-- 
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/



Re: AMQ-CMS - questions about threads/producerthreads in AMQ

Posted by mz...@domdv.de.
Hello Timothy,

> Creating a Consumer object and registering a MessageListener will result
> in messages being received in a separate thread, so there's not really
> any need to create a new thread for your consumer.

That explains the count of threads upon debugging. I already thought that
may be the case, but its good to have it confirmed.

> This is commonly done using a Blocking Queue, the producer thread waits
> on something being enqueued and once it is, it dequeues it and does
> whatever work is needed.

But with a queue there would be a problem that if a subscriber gets
offline due networking reasons, the messages would stack until a memory
error occurs, or?

And with a "Blocking Queue" you mean simply a sync Queue or? Are there
samples for this?

Thanks for the help!

Best regards,
amqBeginner


Re: AMQ-CMS - questions about threads/producerthreads in AMQ

Posted by mz...@domdv.de.
Hello Timothy, hello Óscar,

I would like to thank both of you for giving me some new insights to AMQ.
I think I got a good clue on how to go on.

@Oscar: Yeah, your sample is around to the thing I want to do. Thanks very
much. I can now better understand the mutexes used with AMQ. And what kind
of mutexes I must use.

Thanks again!

Best regards,
amqBeginner


Re: AMQ-CMS - questions about threads/producerthreads in AMQ

Posted by Oscar Pernas <os...@pernas.es>.
I think that if you go to section CMS-API-Overview

http://activemq.apache.org/cms/cms-api-overview.html


 <http://activemq.apache.org/cms/example.html>At the end, you could see
something that is more addaptable for that you want. I think ;-)


regards

2011/3/4 <mz...@domdv.de>

> Hello there,
>
> greetings from Germany. ;)
>
> I'm currently trying to work myself into AMQ. I already tried several
> approches to the producer thread question and have searched the web and the
> forum. I'm also a beginner to the multithreading theme.
>
> The situation:
> I would like my application to create a consumer and a producer, which is
> no
> problem. But both should live forever.
> The consumer sleeps via a Countdownlatch, which will never be count down,
> and wakes up via a Messagelistener, when a message is recieved.
> The producer is my main problem. The application tells me to send a
> message.
> Now i want to tell my producer thread to send this message syncron with
> failure handling etc, while my application runs on. How do i tell the
> thread
> to wait for a message? How do I tell the threads to live forever?
>
> I already thought about using mutexes, semaphores, signals etc. But i found
> out that the decaf threads already use pthread internally. So i guess there
> exists functions for my problems, that i haven't found so far.
>
> Propably my first trys do something which is way to complex, where there
> already exists methods or wrapper:
>
> I already tried to use signals, but the signal doesn't seems work with the
> threadID i get from the decaf thread. Eg:
>
> Application:
>
> this->consumerThread = new Thread(&cmsConsAsync);
> this->consumerThread->start();
> cmsConsAsync.waitUntilReady();
> this->producerThread = new Thread(&cmsProdAsync);
> this->producerThread->start();
> cmsProdAsync.waitUntilReady();
> this->prodThreadID = this->producerThread->getId();
> printf("PID: %lld\n", this->prodThreadID);
> printf("State: %d\n", this->producerThread->getState());
> if (this->producerThread->isAlive())
>  {
>      printf("IsAlive\n");
>      kill(this->prodThreadID, SIGUSR1);
>      printf("IsAlive2\n");
>  }
>  else
>      printf("I'm killed!\n");
>
> The Producer:
>
>            /* set function calls */
>            signal(SIGUSR1, CMSProducer::sendTextMessage);
>            signal(SIGUSR2, CMSProducer::sendObjMessage);
>            signal(SIGQUIT, CMSProducer::shutdown);
>
>            //let caller run on
>            latch.countDown();
>
>            //set to sleep, will wake up on signal calls
>            //waitLatch.await();
>            while(CMSProducer::noQuit)
>            {
>                printf("PAUSE!\n");
>                pause();
>            }
>
> Is there a better way to do something like this?
>
> Best regards,
> amqBeginner
>
>


-- 
Óscar Pernas Plaza.