You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by mfan <ch...@gmail.com> on 2015/09/29 03:12:44 UTC

The consumer thread closed right after producer thread finishes sending messages.

I am trying to write an application that a producer (with PERSISTENT delivery
mode) sends 80000 messages to asynchronous a consumer. Both producer and
consumer are in acknowledgement mode (AUTO_ACKNOWLEDGE).  After the consumer
received a message, it will do some work.  Somehow the producer producing
message faster than consumer did the work, and right after the producer
finished sending it's last message, both consumer called destructor
automatically.  How to let the consumer continuously getting the message and
do the computation ?  I search online and found maybe due to the
memoryLimit. So I check the activemq.xml file in
'/home/apache-activemq-5.11.1/data/' directory, I copied partial related to
memory paragraph in below which I do not quite understand, I need help to
make sure the memory was not the issue. If that is true, what else problem
could be ? Thank you for helping.


Store limit is 102400 mb (current store usage is 72 mb). The data directory:
/home/apache-activemq-5.11.1/data/kahadb only has 12918 mb of usable space -
resetting to maximum available disk space: 12990 mb |
org.apache.activemq.broker.BrokerService | main
2015-09-28 17:25:08,460 | WARN  | Temporary Store limit is 51200 mb, whilst
the temporary data directory:
/home/apache-activemq-5.11.1/data/localhost/tmp_storage only has 12918 mb of
usable space - resetting to maximum available 12918 mb. |
org.apache.activemq.broker.BrokerService | main



--
View this message in context: http://activemq.2283324.n4.nabble.com/The-consumer-thread-closed-right-after-producer-thread-finishes-sending-messages-tp4702393.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: The consumer thread closed right after producer thread finishes sending messages.

Posted by Tim Bain <tb...@alumni.duke.edu>.
You could say that if you haven't received a message and it's been longer
than some time interval (1 second? 5 seconds? 10 minutes? whatever you
want) since you finished processing the last message, then you assume that
there's nothing more to process.  You could do that in a few different
ways, but one would be to start a timer at the end of your onMessage()
method that will call cLatch.countDown() when the timer fires, and cancel
the timer (if one exists) at the beginning of onMessage().

An alternative solution would be to have an atomic/threadsafe integer
containing the number of outstanding messages and an atomic/threadsafe
boolean containing a flag to say that the producer has sent all of the
messages it's going to send.  Then when allMessagesSent is true and the
consumer has received as many messages as the producer says it sent, you
know the consumer is done and you can count down the latch

Also, you probably could initialize the countdown latch to 1 instead of 2
and have the producer not call countDown() and only call await(); my
initial suggestion works but is more complicated than it needs to be...

Tim

On Tue, Sep 29, 2015 at 6:16 PM, Chaomei Lo <ch...@gmail.com> wrote:

> Thanks. Tim. sorry to confuse you, I am doing C++ and the Java broker did
> not exit when my c++ application exited.  Yes, I am using one processor
> with two threads involved.  I appreciate your suggestions, now I understand
> how that CountDownLatch works, that made me realize I modified the
> HellowWorld example code wrong.  Now I fixed it then I modified one line of
> the example code from doneLatch.await(Millis)
> to
> doneLatch.await(), and have the user to specify the number of messages and
> pass it to consumer for the doneLatch, and it works fine.
> But to have a better solution, if I do not want user to input the number of
> messages, I still have work to do; I would go with your second suggestion.
> First I would create a CountDownLatch object with 2 counts in main() and
> then pass this instance cLatch to both consumer and producer.
>
> CountDownLatch cLatch(2);
> cLatch.await();
>
> then invoke
> cLatch.countDown() in producer code after producer finishes
> cLatch.countDown() in consumer code after consumer finishes
>
> But I am wondering how do I know when the consumer would finish.  the
> onMessage() will be invoked every time when the message comes in. Without
> knowing the number of messages beforehand, do I know when it will be
> finished ?  Did I think wrong on what you have suggested ?
>
> Thanks.
>
> On Mon, Sep 28, 2015 at 9:46 PM, Tim Bain <tb...@alumni.duke.edu> wrote:
>
> > Now I'm confused.  Are you doing C++ or Java?  You referenced both .cpp
> > files and the JVM, and I don't know how those two go together.  Unless
> you
> > mean that the broker's JVM is exiting when being accessed by C++ clients?
> > Also, I assumed (since you haven't clearly said one way or the other)
> that
> > your consumer and your producer are different threads in the same
> process;
> > if that's not true; disregard all of what I've written below, and my
> > earlier suggestion.
> >
> > I think you could do one (and only one) of the following options to
> prevent
> > the producer thread from exiting before the consumer thread (and there
> are
> > others ways that will work, don't feel constrained by them if you have
> > another one):
> >
> >    1. Have the producer thread call consumerThread.join() (which means
> you
> >    have to pass a reference from one thread to the other), which forces
> the
> >    consumer thread to exit before the producer thread is allowed to.
> >    2. Have a CountDownLatch initialized to 2, where both threads call
> >    countDown() after they've done all of their work, which guarantees
> that
> > the
> >    consumer's work is all done before the producer thread exits, even if
> > it's
> >    still possible for the producer thread to actually exit first (who
> > cares).
> >    3. Have an AtomicBoolean called something like consumerDone
> initialized
> >    to false and only set to true when the consumer thread has done all of
> > its
> >    work, and have the producer thread execute the following loop after
> all
> > of
> >    its work is done:  while (!consumerDone.get()) {
> >    Thread.sleep(someDuration);}
> >    4. Execute consumerThread's work in a FutureTask, and call get() on
> the
> >    Future interface it implements to wait until the task is done.
> >
> > There's no point doing more than one of those approaches; it's
> duplicative
> > and unnecessarily complicates your code.  I'd avoid #1 because creating
> > your own threads is generally frowned upon these days; in most cases you
> > should be using Executors to run threaded work because it's easier to
> scale
> > them and to handle problems, so I wouldn't recommend choosing an option
> > that prevents you from doing that and will force you to rewrite that code
> > if you ever wanted to.
> >
> > Tim
> >
> > On Sep 28, 2015 8:51 PM, "Chaomei Lo" <ch...@gmail.com> wrote:
> >
> > > I did have these lines in main.
> > >   producerThread.join();
> > >   consumerThread.join();
> > >
> > > and in run() in Consumer.cpp, I have
> > >
> > >    latch.countDown();
> > >
> > > and in onMessage() in Consumer.cpp, I have
> > >   doneLatch.countDown();
> > >
> > > Am I doing right ? But I am having a problem to understand what they
> are
> > > for.  I made a mistake on my code, so after fixing, I got about 45000
> > > message, and then JVM exited out.  The activemq.log is in below. Can
> you
> > > see anything wrong ? Thank you.
> > > --------------------------------------------------------
> > > 2015-09-28 19:09:01,412 | INFO  |
> > > queue://Consumer.B.VirtualTopic.TestDestination purged of 0 messages |
> > > org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:01,414 | INFO  | queue://testBasics1 purged of 0
> > messages
> > > | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,649 | INFO  | queue://TEST.FOO purged of 47217
> > messages
> > > | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,651 | INFO  |
> > > queue://testSessionCommitAfterConsumerClosed purged of 0 messages |
> > > org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,653 | INFO  | queue://testSend1 purged of 0
> messages
> > |
> > > org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,655 | INFO  |
> > > queue://Consumer.A.VirtualTopic.TestDestination purged of 0 messages |
> > > org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,656 | INFO  | queue://testReceive1 purged of 0
> > messages
> > > | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,660 | INFO  | queue://Queue-1438296202694 purged
> of 0
> > > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,663 | INFO  | queue://CmsSendWithAsyncCallbackTest
> > > purged of 0 messages | org.apache.activemq.broker.region.Queue | RMI
> TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,665 | INFO  | queue://Queue-1438296199638 purged
> of 0
> > > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,667 | INFO  | queue://ActiveMQ.DLQ purged of 0
> > messages
> > > | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > > 2015-09-28 19:09:04,669 | INFO  | queue://Queue-1438296198618 purged
> of 0
> > > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > > Connection(8)-130.20.132.42
> > >
> > > On Mon, Sep 28, 2015 at 7:05 PM, Tim Bain <tb...@alumni.duke.edu>
> wrote:
> > >
> > > > Is the producer your last non-daemon thread?  If so, the JVM will
> exit
> > > when
> > > > that thread exits, but you could use a synchronization construct such
> > as
> > > a
> > > > countdown latch or a call to Thread.join() to make the producer
> thread
> > > not
> > > > exit till the consumer does.
> > > > On Sep 28, 2015 7:21 PM, "mfan" <ch...@gmail.com> wrote:
> > > >
> > > > > I am trying to write an application that a producer (with
> PERSISTENT
> > > > > delivery
> > > > > mode) sends 80000 messages to asynchronous a consumer. Both
> producer
> > > and
> > > > > consumer are in acknowledgement mode (AUTO_ACKNOWLEDGE).  After the
> > > > > consumer
> > > > > received a message, it will do some work.  Somehow the producer
> > > producing
> > > > > message faster than consumer did the work, and right after the
> > producer
> > > > > finished sending it's last message, both consumer called destructor
> > > > > automatically.  How to let the consumer continuously getting the
> > > message
> > > > > and
> > > > > do the computation ?  I search online and found maybe due to the
> > > > > memoryLimit. So I check the activemq.xml file in
> > > > > '/home/apache-activemq-5.11.1/data/' directory, I copied partial
> > > related
> > > > to
> > > > > memory paragraph in below which I do not quite understand, I need
> > help
> > > to
> > > > > make sure the memory was not the issue. If that is true, what else
> > > > problem
> > > > > could be ? Thank you for helping.
> > > > >
> > > > >
> > > > > Store limit is 102400 mb (current store usage is 72 mb). The data
> > > > > directory:
> > > > > /home/apache-activemq-5.11.1/data/kahadb only has 12918 mb of
> usable
> > > > space
> > > > > -
> > > > > resetting to maximum available disk space: 12990 mb |
> > > > > org.apache.activemq.broker.BrokerService | main
> > > > > 2015-09-28 17:25:08,460 | WARN  | Temporary Store limit is 51200
> mb,
> > > > whilst
> > > > > the temporary data directory:
> > > > > /home/apache-activemq-5.11.1/data/localhost/tmp_storage only has
> > 12918
> > > mb
> > > > > of
> > > > > usable space - resetting to maximum available 12918 mb. |
> > > > > org.apache.activemq.broker.BrokerService | main
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > View this message in context:
> > > > >
> > > >
> > >
> >
> http://activemq.2283324.n4.nabble.com/The-consumer-thread-closed-right-after-producer-thread-finishes-sending-messages-tp4702393.html
> > > > > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> > > > >
> > > >
> > >
> >
>

Re: The consumer thread closed right after producer thread finishes sending messages.

Posted by Chaomei Lo <ch...@gmail.com>.
Thanks. Tim. sorry to confuse you, I am doing C++ and the Java broker did
not exit when my c++ application exited.  Yes, I am using one processor
with two threads involved.  I appreciate your suggestions, now I understand
how that CountDownLatch works, that made me realize I modified the
HellowWorld example code wrong.  Now I fixed it then I modified one line of
the example code from doneLatch.await(Millis)
to
doneLatch.await(), and have the user to specify the number of messages and
pass it to consumer for the doneLatch, and it works fine.
But to have a better solution, if I do not want user to input the number of
messages, I still have work to do; I would go with your second suggestion.
First I would create a CountDownLatch object with 2 counts in main() and
then pass this instance cLatch to both consumer and producer.

CountDownLatch cLatch(2);
cLatch.await();

then invoke
cLatch.countDown() in producer code after producer finishes
cLatch.countDown() in consumer code after consumer finishes

But I am wondering how do I know when the consumer would finish.  the
onMessage() will be invoked every time when the message comes in. Without
knowing the number of messages beforehand, do I know when it will be
finished ?  Did I think wrong on what you have suggested ?

Thanks.

On Mon, Sep 28, 2015 at 9:46 PM, Tim Bain <tb...@alumni.duke.edu> wrote:

> Now I'm confused.  Are you doing C++ or Java?  You referenced both .cpp
> files and the JVM, and I don't know how those two go together.  Unless you
> mean that the broker's JVM is exiting when being accessed by C++ clients?
> Also, I assumed (since you haven't clearly said one way or the other) that
> your consumer and your producer are different threads in the same process;
> if that's not true; disregard all of what I've written below, and my
> earlier suggestion.
>
> I think you could do one (and only one) of the following options to prevent
> the producer thread from exiting before the consumer thread (and there are
> others ways that will work, don't feel constrained by them if you have
> another one):
>
>    1. Have the producer thread call consumerThread.join() (which means you
>    have to pass a reference from one thread to the other), which forces the
>    consumer thread to exit before the producer thread is allowed to.
>    2. Have a CountDownLatch initialized to 2, where both threads call
>    countDown() after they've done all of their work, which guarantees that
> the
>    consumer's work is all done before the producer thread exits, even if
> it's
>    still possible for the producer thread to actually exit first (who
> cares).
>    3. Have an AtomicBoolean called something like consumerDone initialized
>    to false and only set to true when the consumer thread has done all of
> its
>    work, and have the producer thread execute the following loop after all
> of
>    its work is done:  while (!consumerDone.get()) {
>    Thread.sleep(someDuration);}
>    4. Execute consumerThread's work in a FutureTask, and call get() on the
>    Future interface it implements to wait until the task is done.
>
> There's no point doing more than one of those approaches; it's duplicative
> and unnecessarily complicates your code.  I'd avoid #1 because creating
> your own threads is generally frowned upon these days; in most cases you
> should be using Executors to run threaded work because it's easier to scale
> them and to handle problems, so I wouldn't recommend choosing an option
> that prevents you from doing that and will force you to rewrite that code
> if you ever wanted to.
>
> Tim
>
> On Sep 28, 2015 8:51 PM, "Chaomei Lo" <ch...@gmail.com> wrote:
>
> > I did have these lines in main.
> >   producerThread.join();
> >   consumerThread.join();
> >
> > and in run() in Consumer.cpp, I have
> >
> >    latch.countDown();
> >
> > and in onMessage() in Consumer.cpp, I have
> >   doneLatch.countDown();
> >
> > Am I doing right ? But I am having a problem to understand what they are
> > for.  I made a mistake on my code, so after fixing, I got about 45000
> > message, and then JVM exited out.  The activemq.log is in below. Can you
> > see anything wrong ? Thank you.
> > --------------------------------------------------------
> > 2015-09-28 19:09:01,412 | INFO  |
> > queue://Consumer.B.VirtualTopic.TestDestination purged of 0 messages |
> > org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:01,414 | INFO  | queue://testBasics1 purged of 0
> messages
> > | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,649 | INFO  | queue://TEST.FOO purged of 47217
> messages
> > | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,651 | INFO  |
> > queue://testSessionCommitAfterConsumerClosed purged of 0 messages |
> > org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,653 | INFO  | queue://testSend1 purged of 0 messages
> |
> > org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,655 | INFO  |
> > queue://Consumer.A.VirtualTopic.TestDestination purged of 0 messages |
> > org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,656 | INFO  | queue://testReceive1 purged of 0
> messages
> > | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,660 | INFO  | queue://Queue-1438296202694 purged of 0
> > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,663 | INFO  | queue://CmsSendWithAsyncCallbackTest
> > purged of 0 messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,665 | INFO  | queue://Queue-1438296199638 purged of 0
> > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,667 | INFO  | queue://ActiveMQ.DLQ purged of 0
> messages
> > | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> > 2015-09-28 19:09:04,669 | INFO  | queue://Queue-1438296198618 purged of 0
> > messages | org.apache.activemq.broker.region.Queue | RMI TCP
> > Connection(8)-130.20.132.42
> >
> > On Mon, Sep 28, 2015 at 7:05 PM, Tim Bain <tb...@alumni.duke.edu> wrote:
> >
> > > Is the producer your last non-daemon thread?  If so, the JVM will exit
> > when
> > > that thread exits, but you could use a synchronization construct such
> as
> > a
> > > countdown latch or a call to Thread.join() to make the producer thread
> > not
> > > exit till the consumer does.
> > > On Sep 28, 2015 7:21 PM, "mfan" <ch...@gmail.com> wrote:
> > >
> > > > I am trying to write an application that a producer (with PERSISTENT
> > > > delivery
> > > > mode) sends 80000 messages to asynchronous a consumer. Both producer
> > and
> > > > consumer are in acknowledgement mode (AUTO_ACKNOWLEDGE).  After the
> > > > consumer
> > > > received a message, it will do some work.  Somehow the producer
> > producing
> > > > message faster than consumer did the work, and right after the
> producer
> > > > finished sending it's last message, both consumer called destructor
> > > > automatically.  How to let the consumer continuously getting the
> > message
> > > > and
> > > > do the computation ?  I search online and found maybe due to the
> > > > memoryLimit. So I check the activemq.xml file in
> > > > '/home/apache-activemq-5.11.1/data/' directory, I copied partial
> > related
> > > to
> > > > memory paragraph in below which I do not quite understand, I need
> help
> > to
> > > > make sure the memory was not the issue. If that is true, what else
> > > problem
> > > > could be ? Thank you for helping.
> > > >
> > > >
> > > > Store limit is 102400 mb (current store usage is 72 mb). The data
> > > > directory:
> > > > /home/apache-activemq-5.11.1/data/kahadb only has 12918 mb of usable
> > > space
> > > > -
> > > > resetting to maximum available disk space: 12990 mb |
> > > > org.apache.activemq.broker.BrokerService | main
> > > > 2015-09-28 17:25:08,460 | WARN  | Temporary Store limit is 51200 mb,
> > > whilst
> > > > the temporary data directory:
> > > > /home/apache-activemq-5.11.1/data/localhost/tmp_storage only has
> 12918
> > mb
> > > > of
> > > > usable space - resetting to maximum available 12918 mb. |
> > > > org.apache.activemq.broker.BrokerService | main
> > > >
> > > >
> > > >
> > > > --
> > > > View this message in context:
> > > >
> > >
> >
> http://activemq.2283324.n4.nabble.com/The-consumer-thread-closed-right-after-producer-thread-finishes-sending-messages-tp4702393.html
> > > > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> > > >
> > >
> >
>

Re: The consumer thread closed right after producer thread finishes sending messages.

Posted by Tim Bain <tb...@alumni.duke.edu>.
Now I'm confused.  Are you doing C++ or Java?  You referenced both .cpp
files and the JVM, and I don't know how those two go together.  Unless you
mean that the broker's JVM is exiting when being accessed by C++ clients?
Also, I assumed (since you haven't clearly said one way or the other) that
your consumer and your producer are different threads in the same process;
if that's not true; disregard all of what I've written below, and my
earlier suggestion.

I think you could do one (and only one) of the following options to prevent
the producer thread from exiting before the consumer thread (and there are
others ways that will work, don't feel constrained by them if you have
another one):

   1. Have the producer thread call consumerThread.join() (which means you
   have to pass a reference from one thread to the other), which forces the
   consumer thread to exit before the producer thread is allowed to.
   2. Have a CountDownLatch initialized to 2, where both threads call
   countDown() after they've done all of their work, which guarantees that the
   consumer's work is all done before the producer thread exits, even if it's
   still possible for the producer thread to actually exit first (who cares).
   3. Have an AtomicBoolean called something like consumerDone initialized
   to false and only set to true when the consumer thread has done all of its
   work, and have the producer thread execute the following loop after all of
   its work is done:  while (!consumerDone.get()) {
   Thread.sleep(someDuration);}
   4. Execute consumerThread's work in a FutureTask, and call get() on the
   Future interface it implements to wait until the task is done.

There's no point doing more than one of those approaches; it's duplicative
and unnecessarily complicates your code.  I'd avoid #1 because creating
your own threads is generally frowned upon these days; in most cases you
should be using Executors to run threaded work because it's easier to scale
them and to handle problems, so I wouldn't recommend choosing an option
that prevents you from doing that and will force you to rewrite that code
if you ever wanted to.

Tim

On Sep 28, 2015 8:51 PM, "Chaomei Lo" <ch...@gmail.com> wrote:

> I did have these lines in main.
>   producerThread.join();
>   consumerThread.join();
>
> and in run() in Consumer.cpp, I have
>
>    latch.countDown();
>
> and in onMessage() in Consumer.cpp, I have
>   doneLatch.countDown();
>
> Am I doing right ? But I am having a problem to understand what they are
> for.  I made a mistake on my code, so after fixing, I got about 45000
> message, and then JVM exited out.  The activemq.log is in below. Can you
> see anything wrong ? Thank you.
> --------------------------------------------------------
> 2015-09-28 19:09:01,412 | INFO  |
> queue://Consumer.B.VirtualTopic.TestDestination purged of 0 messages |
> org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:01,414 | INFO  | queue://testBasics1 purged of 0 messages
> | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,649 | INFO  | queue://TEST.FOO purged of 47217 messages
> | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,651 | INFO  |
> queue://testSessionCommitAfterConsumerClosed purged of 0 messages |
> org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,653 | INFO  | queue://testSend1 purged of 0 messages |
> org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,655 | INFO  |
> queue://Consumer.A.VirtualTopic.TestDestination purged of 0 messages |
> org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,656 | INFO  | queue://testReceive1 purged of 0 messages
> | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,660 | INFO  | queue://Queue-1438296202694 purged of 0
> messages | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,663 | INFO  | queue://CmsSendWithAsyncCallbackTest
> purged of 0 messages | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,665 | INFO  | queue://Queue-1438296199638 purged of 0
> messages | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,667 | INFO  | queue://ActiveMQ.DLQ purged of 0 messages
> | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
> 2015-09-28 19:09:04,669 | INFO  | queue://Queue-1438296198618 purged of 0
> messages | org.apache.activemq.broker.region.Queue | RMI TCP
> Connection(8)-130.20.132.42
>
> On Mon, Sep 28, 2015 at 7:05 PM, Tim Bain <tb...@alumni.duke.edu> wrote:
>
> > Is the producer your last non-daemon thread?  If so, the JVM will exit
> when
> > that thread exits, but you could use a synchronization construct such as
> a
> > countdown latch or a call to Thread.join() to make the producer thread
> not
> > exit till the consumer does.
> > On Sep 28, 2015 7:21 PM, "mfan" <ch...@gmail.com> wrote:
> >
> > > I am trying to write an application that a producer (with PERSISTENT
> > > delivery
> > > mode) sends 80000 messages to asynchronous a consumer. Both producer
> and
> > > consumer are in acknowledgement mode (AUTO_ACKNOWLEDGE).  After the
> > > consumer
> > > received a message, it will do some work.  Somehow the producer
> producing
> > > message faster than consumer did the work, and right after the producer
> > > finished sending it's last message, both consumer called destructor
> > > automatically.  How to let the consumer continuously getting the
> message
> > > and
> > > do the computation ?  I search online and found maybe due to the
> > > memoryLimit. So I check the activemq.xml file in
> > > '/home/apache-activemq-5.11.1/data/' directory, I copied partial
> related
> > to
> > > memory paragraph in below which I do not quite understand, I need help
> to
> > > make sure the memory was not the issue. If that is true, what else
> > problem
> > > could be ? Thank you for helping.
> > >
> > >
> > > Store limit is 102400 mb (current store usage is 72 mb). The data
> > > directory:
> > > /home/apache-activemq-5.11.1/data/kahadb only has 12918 mb of usable
> > space
> > > -
> > > resetting to maximum available disk space: 12990 mb |
> > > org.apache.activemq.broker.BrokerService | main
> > > 2015-09-28 17:25:08,460 | WARN  | Temporary Store limit is 51200 mb,
> > whilst
> > > the temporary data directory:
> > > /home/apache-activemq-5.11.1/data/localhost/tmp_storage only has 12918
> mb
> > > of
> > > usable space - resetting to maximum available 12918 mb. |
> > > org.apache.activemq.broker.BrokerService | main
> > >
> > >
> > >
> > > --
> > > View this message in context:
> > >
> >
> http://activemq.2283324.n4.nabble.com/The-consumer-thread-closed-right-after-producer-thread-finishes-sending-messages-tp4702393.html
> > > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> > >
> >
>

Re: The consumer thread closed right after producer thread finishes sending messages.

Posted by Chaomei Lo <ch...@gmail.com>.
I did have these lines in main.
  producerThread.join();
  consumerThread.join();

and in run() in Consumer.cpp, I have

   latch.countDown();

and in onMessage() in Consumer.cpp, I have
  doneLatch.countDown();

Am I doing right ? But I am having a problem to understand what they are
for.  I made a mistake on my code, so after fixing, I got about 45000
message, and then JVM exited out.  The activemq.log is in below. Can you
see anything wrong ? Thank you.
--------------------------------------------------------
2015-09-28 19:09:01,412 | INFO  |
queue://Consumer.B.VirtualTopic.TestDestination purged of 0 messages |
org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:01,414 | INFO  | queue://testBasics1 purged of 0 messages
| org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:04,649 | INFO  | queue://TEST.FOO purged of 47217 messages
| org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:04,651 | INFO  |
queue://testSessionCommitAfterConsumerClosed purged of 0 messages |
org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:04,653 | INFO  | queue://testSend1 purged of 0 messages |
org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:04,655 | INFO  |
queue://Consumer.A.VirtualTopic.TestDestination purged of 0 messages |
org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:04,656 | INFO  | queue://testReceive1 purged of 0 messages
| org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:04,660 | INFO  | queue://Queue-1438296202694 purged of 0
messages | org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:04,663 | INFO  | queue://CmsSendWithAsyncCallbackTest
purged of 0 messages | org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:04,665 | INFO  | queue://Queue-1438296199638 purged of 0
messages | org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:04,667 | INFO  | queue://ActiveMQ.DLQ purged of 0 messages
| org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42
2015-09-28 19:09:04,669 | INFO  | queue://Queue-1438296198618 purged of 0
messages | org.apache.activemq.broker.region.Queue | RMI TCP
Connection(8)-130.20.132.42

On Mon, Sep 28, 2015 at 7:05 PM, Tim Bain <tb...@alumni.duke.edu> wrote:

> Is the producer your last non-daemon thread?  If so, the JVM will exit when
> that thread exits, but you could use a synchronization construct such as a
> countdown latch or a call to Thread.join() to make the producer thread not
> exit till the consumer does.
> On Sep 28, 2015 7:21 PM, "mfan" <ch...@gmail.com> wrote:
>
> > I am trying to write an application that a producer (with PERSISTENT
> > delivery
> > mode) sends 80000 messages to asynchronous a consumer. Both producer and
> > consumer are in acknowledgement mode (AUTO_ACKNOWLEDGE).  After the
> > consumer
> > received a message, it will do some work.  Somehow the producer producing
> > message faster than consumer did the work, and right after the producer
> > finished sending it's last message, both consumer called destructor
> > automatically.  How to let the consumer continuously getting the message
> > and
> > do the computation ?  I search online and found maybe due to the
> > memoryLimit. So I check the activemq.xml file in
> > '/home/apache-activemq-5.11.1/data/' directory, I copied partial related
> to
> > memory paragraph in below which I do not quite understand, I need help to
> > make sure the memory was not the issue. If that is true, what else
> problem
> > could be ? Thank you for helping.
> >
> >
> > Store limit is 102400 mb (current store usage is 72 mb). The data
> > directory:
> > /home/apache-activemq-5.11.1/data/kahadb only has 12918 mb of usable
> space
> > -
> > resetting to maximum available disk space: 12990 mb |
> > org.apache.activemq.broker.BrokerService | main
> > 2015-09-28 17:25:08,460 | WARN  | Temporary Store limit is 51200 mb,
> whilst
> > the temporary data directory:
> > /home/apache-activemq-5.11.1/data/localhost/tmp_storage only has 12918 mb
> > of
> > usable space - resetting to maximum available 12918 mb. |
> > org.apache.activemq.broker.BrokerService | main
> >
> >
> >
> > --
> > View this message in context:
> >
> http://activemq.2283324.n4.nabble.com/The-consumer-thread-closed-right-after-producer-thread-finishes-sending-messages-tp4702393.html
> > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >
>

Re: The consumer thread closed right after producer thread finishes sending messages.

Posted by Tim Bain <tb...@alumni.duke.edu>.
Is the producer your last non-daemon thread?  If so, the JVM will exit when
that thread exits, but you could use a synchronization construct such as a
countdown latch or a call to Thread.join() to make the producer thread not
exit till the consumer does.
On Sep 28, 2015 7:21 PM, "mfan" <ch...@gmail.com> wrote:

> I am trying to write an application that a producer (with PERSISTENT
> delivery
> mode) sends 80000 messages to asynchronous a consumer. Both producer and
> consumer are in acknowledgement mode (AUTO_ACKNOWLEDGE).  After the
> consumer
> received a message, it will do some work.  Somehow the producer producing
> message faster than consumer did the work, and right after the producer
> finished sending it's last message, both consumer called destructor
> automatically.  How to let the consumer continuously getting the message
> and
> do the computation ?  I search online and found maybe due to the
> memoryLimit. So I check the activemq.xml file in
> '/home/apache-activemq-5.11.1/data/' directory, I copied partial related to
> memory paragraph in below which I do not quite understand, I need help to
> make sure the memory was not the issue. If that is true, what else problem
> could be ? Thank you for helping.
>
>
> Store limit is 102400 mb (current store usage is 72 mb). The data
> directory:
> /home/apache-activemq-5.11.1/data/kahadb only has 12918 mb of usable space
> -
> resetting to maximum available disk space: 12990 mb |
> org.apache.activemq.broker.BrokerService | main
> 2015-09-28 17:25:08,460 | WARN  | Temporary Store limit is 51200 mb, whilst
> the temporary data directory:
> /home/apache-activemq-5.11.1/data/localhost/tmp_storage only has 12918 mb
> of
> usable space - resetting to maximum available 12918 mb. |
> org.apache.activemq.broker.BrokerService | main
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/The-consumer-thread-closed-right-after-producer-thread-finishes-sending-messages-tp4702393.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>