You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Eugeny N Dzhurinsky <bo...@redwerk.com> on 2008/10/28 17:28:45 UTC

CLIENT_ACKNOWLEDGE mode for a session misunderstanding?

Hello!

I am facing some strange issue with acknowledge mode in ActiveMQ. After
reading the specs, I realized the broker will never deliver a message to the
consumer unless the consumer acknowledges the previous message. So I thought
it is enough to not send the acknowledgement message to the server in the
onMessage method of the consumer and sent it later from another thread, which
does it's job. This means I spawn a thread when onMessage is called and forget
about it in this method.

However I've found this is not true. Below is my simple test case - I am
starting the single consumer and several producers.

===================================================================================================================
import javax.jms.Connection;

public class TestAcknowledgeMode {

    private static final String DATA = "data";

    private static final String SERVER_QUEUE = "server_queue";

    private static final String CONNECTION_URL = "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1";

    private static final int CLIENT_THREADS = 5;

    static ActiveMQConnectionFactory factory;

    Semaphore semaphore = new Semaphore(CLIENT_THREADS, true);

    AtomicInteger successCount = new AtomicInteger(0);

    @BeforeClass
    public static void initFactory() throws Exception {
        factory = new ActiveMQConnectionFactory(CONNECTION_URL);
        BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(
                "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n")));
        Logger.getRootLogger().setLevel(Level.DEBUG);
    }

    @Test(timeout = 10000)
    public void testAckDelivery() throws Exception {
        final Connection clientConnection = factory.createConnection();
        clientConnection.start();
        final Connection serverConnection = factory.createConnection();
        serverConnection.start();

        final Session serverSession = serverConnection.createSession(false,
                Session.CLIENT_ACKNOWLEDGE);
        final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE);
        final MessageConsumer consumer = serverSession
                .createConsumer(serverQueue);
        consumer.setMessageListener(new MessageListener() {

            public void onMessage(final Message message) {
                System.out.println("Got message");
                try {
                    System.out.println(message.getStringProperty(DATA));
                    // message.acknowledge();
                    successCount.incrementAndGet();
                } catch (final Exception e) {
                    e.printStackTrace();
                }
            }

        });
        for (int i = 0; i < CLIENT_THREADS; i++) {
            final Thread t = new Thread(new Runnable() {

                public void run() {
                    try {
                        final Session clientSession = clientConnection
                                .createSession(false,
                                        Session.CLIENT_ACKNOWLEDGE);
                        final Queue clientQueue = clientSession
                                .createQueue(SERVER_QUEUE);
                        final MessageProducer producer = clientSession
                                .createProducer(clientQueue);
                        final Message msg = clientSession.createMessage();
                        msg.setStringProperty(DATA, Thread.currentThread()
                                .getName());
                        System.err.println("Sending data "
                                + Thread.currentThread().getName());
                        producer.send(msg);
                    } catch (final Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
        for (;;) {
            if (successCount.intValue() == CLIENT_THREADS)
                break;
            Thread.sleep(1000);
        }
        Assert.assertEquals(CLIENT_THREADS, successCount.intValue());
    }
}
===================================================================================================================

I expected the only one message will be processed by the consumer, so the rest
messages will never be delivered to it, but this test shows the messages are
delivered to the consumer and onMessage method is called exactly CLIENT_THREADS times, 
which seems to be wrong.

Does it mean the onMessage method will be executed with no matter of
acknowledgement is sent back, as soon as previous execution finishes?

And what is the purpose of message.acknowledge() method?

Thank you in advance!

-- 
Eugene N Dzhurinsky

Re: CLIENT_ACKNOWLEDGE mode for a session misunderstanding?

Posted by Joe Fernandez <jo...@ttmsolutions.com>.
I think this is because when you set prefetch to 0, your consumer must use
the receive method to 'pull' messages from the broker; the broker will not
'push' messages to the consumer. 

Joe
http://www.ttmsolutions.com - get a free ActiveMQ user guide 


Eugeny N Dzhurinsky-2 wrote:
> 
> On Wed, Oct 29, 2008 at 05:06:33PM +0000, Rob Davies wrote:
>> Hi Eugeny,
>> 
>> As James stated earlier - you need to set the prefetch - but the value  
>> should be 0 - this will make ActiveMQ 'pull' rather than 'push'
> 
> This doesn't work:
> 
> javax.jms.JMSException: Illegal prefetch size of zero. This setting is not
> supported for asynchronous consumers please set a value of at least 1
>         at
>        
> org.apache.activemq.ActiveMQMessageConsumer.setMessageListener(ActiveMQMessageConsumer.java:358)
> 
> if I set URL to this value:
> 
>      private static final String CONNECTION_URL =
> "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=0";
> 
> -- 
> Eugene N Dzhurinsky
> 
>  
> 

-- 
View this message in context: http://www.nabble.com/CLIENT_ACKNOWLEDGE-mode-for-a-session-misunderstanding--tp20210783p20257226.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: CLIENT_ACKNOWLEDGE mode for a session misunderstanding?

Posted by Eugeny N Dzhurinsky <bo...@redwerk.com>.
On Wed, Oct 29, 2008 at 05:06:33PM +0000, Rob Davies wrote:
> Hi Eugeny,
> 
> As James stated earlier - you need to set the prefetch - but the value  
> should be 0 - this will make ActiveMQ 'pull' rather than 'push'

This doesn't work:

javax.jms.JMSException: Illegal prefetch size of zero. This setting is not
supported for asynchronous consumers please set a value of at least 1
        at
        org.apache.activemq.ActiveMQMessageConsumer.setMessageListener(ActiveMQMessageConsumer.java:358)

if I set URL to this value:

     private static final String CONNECTION_URL = "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=0";

-- 
Eugene N Dzhurinsky

Re: CLIENT_ACKNOWLEDGE mode for a session misunderstanding?

Posted by Rob Davies <ra...@gmail.com>.
Hi Eugeny,

As James stated earlier - you need to set the prefetch - but the value  
should be 0 - this will make ActiveMQ 'pull' rather than 'push'

cheers,

Rob

Rob Davies
http://fusesource.com
http://rajdavies.blogspot.com/


On 29 Oct 2008, at 15:07, Eugeny N Dzhurinsky wrote:

> On Tue, Oct 28, 2008 at 05:44:26PM +0000, James Strachan wrote:
>> 2008/10/28 Eugeny N Dzhurinsky <bo...@redwerk.com>:
>>> Hello!
>>>
>>> I am facing some strange issue with acknowledge mode in ActiveMQ.  
>>> After
>>> reading the specs, I realized the broker will never deliver a  
>>> message to the
>>> consumer unless the consumer acknowledges the previous message. So  
>>> I thought
>>> it is enough to not send the acknowledgement message to the server  
>>> in the
>>> onMessage method of the consumer and sent it later from another  
>>> thread, which
>>> does it's job. This means I spawn a thread when onMessage is  
>>> called and forget
>>> about it in this method.
>> You should only be using a JMS session from one thread at once - so
>> you should call message.acknowlege() from the consumer thread.
>
>   final MessageConsumer consumer = serverSession
>           .createConsumer(serverQueue);
>   consumer.setMessageListener(new MessageListener() {
>
>       public void onMessage(final Message message) {
>           System.out.println("Got message");
>           try {
>               System.out.println(message.getStringProperty(DATA));
>               // message.acknowledge();
>               successCount.incrementAndGet();
>           } catch (final Exception e) {
>               e.printStackTrace();
>           }
>       }
>
>   });
>
> Calling or not calling acknowledge doesn't make any difference - as  
> soon as
> onMessage returns, the next message is delivered to the consumer.
>
>> If you don't want the message broker to send another message to the
>> consumer until it is acknowledged, set prefetch to 1
>> http://activemq.apache.org/what-is-the-prefetch-limit-for.html
>
>   private static final String CONNECTION_URL = "vm://localhost? 
> broker.persistent=false&jms.prefetchPolicy.all=1";
>
> I am doing in this way, as can be found in the source I originally  
> posted.
>
> So is there a bug in ActiveMQ regarding the acknowledge mode or I  
> still
> missing something?
>
> -- 
> Eugene N Dzhurinsky


Re: CLIENT_ACKNOWLEDGE mode for a session misunderstanding?

Posted by Eugeny N Dzhurinsky <bo...@redwerk.com>.
On Tue, Oct 28, 2008 at 05:44:26PM +0000, James Strachan wrote:
> 2008/10/28 Eugeny N Dzhurinsky <bo...@redwerk.com>:
> > Hello!
> >
> > I am facing some strange issue with acknowledge mode in ActiveMQ. After
> > reading the specs, I realized the broker will never deliver a message to the
> > consumer unless the consumer acknowledges the previous message. So I thought
> > it is enough to not send the acknowledgement message to the server in the
> > onMessage method of the consumer and sent it later from another thread, which
> > does it's job. This means I spawn a thread when onMessage is called and forget
> > about it in this method.
> You should only be using a JMS session from one thread at once - so
> you should call message.acknowlege() from the consumer thread.

   final MessageConsumer consumer = serverSession
           .createConsumer(serverQueue);
   consumer.setMessageListener(new MessageListener() {

       public void onMessage(final Message message) {
           System.out.println("Got message");
           try {
               System.out.println(message.getStringProperty(DATA));
               // message.acknowledge();
               successCount.incrementAndGet();
           } catch (final Exception e) {
               e.printStackTrace();
           }
       }

   });

Calling or not calling acknowledge doesn't make any difference - as soon as
onMessage returns, the next message is delivered to the consumer.

> If you don't want the message broker to send another message to the
> consumer until it is acknowledged, set prefetch to 1
> http://activemq.apache.org/what-is-the-prefetch-limit-for.html

   private static final String CONNECTION_URL = "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1";

I am doing in this way, as can be found in the source I originally posted.

So is there a bug in ActiveMQ regarding the acknowledge mode or I still
missing something?

-- 
Eugene N Dzhurinsky

Re: CLIENT_ACKNOWLEDGE mode for a session misunderstanding?

Posted by Vadim Chekan <ko...@gmail.com>.
Sorry, I think I got it wrong. If message remain intact when consumer
disconnect then CLIENT_ACKNOWLEDGE is honored.


On Tue, Oct 28, 2008 at 7:13 PM, Vadim Chekan <ko...@gmail.com> wrote:
> James,
>
> Are you saying that CLIENT_ACKNOWLEDGE option can not be trusted?
>
> Vadim.
>
> On Tue, Oct 28, 2008 at 10:44 AM, James Strachan
> <ja...@gmail.com> wrote:
>> 2008/10/28 Eugeny N Dzhurinsky <bo...@redwerk.com>:
>>> Hello!
>>>
>>> I am facing some strange issue with acknowledge mode in ActiveMQ. After
>>> reading the specs, I realized the broker will never deliver a message to the
>>> consumer unless the consumer acknowledges the previous message. So I thought
>>> it is enough to not send the acknowledgement message to the server in the
>>> onMessage method of the consumer and sent it later from another thread, which
>>> does it's job. This means I spawn a thread when onMessage is called and forget
>>> about it in this method.
>>
>>
>> You should only be using a JMS session from one thread at once - so
>> you should call message.acknowlege() from the consumer thread.
>>
>>
>>
>>>
>>> However I've found this is not true. Below is my simple test case - I am
>>> starting the single consumer and several producers.
>>>
>>> ===================================================================================================================
>>> import javax.jms.Connection;
>>>
>>> public class TestAcknowledgeMode {
>>>
>>>    private static final String DATA = "data";
>>>
>>>    private static final String SERVER_QUEUE = "server_queue";
>>>
>>>    private static final String CONNECTION_URL = "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1";
>>>
>>>    private static final int CLIENT_THREADS = 5;
>>>
>>>    static ActiveMQConnectionFactory factory;
>>>
>>>    Semaphore semaphore = new Semaphore(CLIENT_THREADS, true);
>>>
>>>    AtomicInteger successCount = new AtomicInteger(0);
>>>
>>>    @BeforeClass
>>>    public static void initFactory() throws Exception {
>>>        factory = new ActiveMQConnectionFactory(CONNECTION_URL);
>>>        BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(
>>>                "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n")));
>>>        Logger.getRootLogger().setLevel(Level.DEBUG);
>>>    }
>>>
>>>    @Test(timeout = 10000)
>>>    public void testAckDelivery() throws Exception {
>>>        final Connection clientConnection = factory.createConnection();
>>>        clientConnection.start();
>>>        final Connection serverConnection = factory.createConnection();
>>>        serverConnection.start();
>>>
>>>        final Session serverSession = serverConnection.createSession(false,
>>>                Session.CLIENT_ACKNOWLEDGE);
>>>        final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE);
>>>        final MessageConsumer consumer = serverSession
>>>                .createConsumer(serverQueue);
>>>        consumer.setMessageListener(new MessageListener() {
>>>
>>>            public void onMessage(final Message message) {
>>>                System.out.println("Got message");
>>>                try {
>>>                    System.out.println(message.getStringProperty(DATA));
>>>                    // message.acknowledge();
>>>                    successCount.incrementAndGet();
>>>                } catch (final Exception e) {
>>>                    e.printStackTrace();
>>>                }
>>>            }
>>>
>>>        });
>>>        for (int i = 0; i < CLIENT_THREADS; i++) {
>>>            final Thread t = new Thread(new Runnable() {
>>>
>>>                public void run() {
>>>                    try {
>>>                        final Session clientSession = clientConnection
>>>                                .createSession(false,
>>>                                        Session.CLIENT_ACKNOWLEDGE);
>>>                        final Queue clientQueue = clientSession
>>>                                .createQueue(SERVER_QUEUE);
>>>                        final MessageProducer producer = clientSession
>>>                                .createProducer(clientQueue);
>>>                        final Message msg = clientSession.createMessage();
>>>                        msg.setStringProperty(DATA, Thread.currentThread()
>>>                                .getName());
>>>                        System.err.println("Sending data "
>>>                                + Thread.currentThread().getName());
>>>                        producer.send(msg);
>>>                    } catch (final Exception e) {
>>>                        e.printStackTrace();
>>>                    }
>>>                }
>>>            });
>>>            t.start();
>>>        }
>>>        for (;;) {
>>>            if (successCount.intValue() == CLIENT_THREADS)
>>>                break;
>>>            Thread.sleep(1000);
>>>        }
>>>        Assert.assertEquals(CLIENT_THREADS, successCount.intValue());
>>>    }
>>> }
>>> ===================================================================================================================
>>>
>>> I expected the only one message will be processed by the consumer, so the rest
>>> messages will never be delivered to it, but this test shows the messages are
>>> delivered to the consumer and onMessage method is called exactly CLIENT_THREADS times,
>>> which seems to be wrong.
>>>
>>> Does it mean the onMessage method will be executed with no matter of
>>> acknowledgement is sent back, as soon as previous execution finishes?
>>>
>>> And what is the purpose of message.acknowledge() method?
>>
>> If you don't want the message broker to send another message to the
>> consumer until it is acknowledged, set prefetch to 1
>> http://activemq.apache.org/what-is-the-prefetch-limit-for.html
>>
>> --
>> James
>> -------
>> http://macstrac.blogspot.com/
>>
>> Open Source Integration
>> http://fusesource.com/
>>
>
>
>
> --
> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT
> is explicitly specified
>



-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT
is explicitly specified

Re: CLIENT_ACKNOWLEDGE mode for a session misunderstanding?

Posted by Vadim Chekan <ko...@gmail.com>.
James,

Are you saying that CLIENT_ACKNOWLEDGE option can not be trusted?

Vadim.

On Tue, Oct 28, 2008 at 10:44 AM, James Strachan
<ja...@gmail.com> wrote:
> 2008/10/28 Eugeny N Dzhurinsky <bo...@redwerk.com>:
>> Hello!
>>
>> I am facing some strange issue with acknowledge mode in ActiveMQ. After
>> reading the specs, I realized the broker will never deliver a message to the
>> consumer unless the consumer acknowledges the previous message. So I thought
>> it is enough to not send the acknowledgement message to the server in the
>> onMessage method of the consumer and sent it later from another thread, which
>> does it's job. This means I spawn a thread when onMessage is called and forget
>> about it in this method.
>
>
> You should only be using a JMS session from one thread at once - so
> you should call message.acknowlege() from the consumer thread.
>
>
>
>>
>> However I've found this is not true. Below is my simple test case - I am
>> starting the single consumer and several producers.
>>
>> ===================================================================================================================
>> import javax.jms.Connection;
>>
>> public class TestAcknowledgeMode {
>>
>>    private static final String DATA = "data";
>>
>>    private static final String SERVER_QUEUE = "server_queue";
>>
>>    private static final String CONNECTION_URL = "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1";
>>
>>    private static final int CLIENT_THREADS = 5;
>>
>>    static ActiveMQConnectionFactory factory;
>>
>>    Semaphore semaphore = new Semaphore(CLIENT_THREADS, true);
>>
>>    AtomicInteger successCount = new AtomicInteger(0);
>>
>>    @BeforeClass
>>    public static void initFactory() throws Exception {
>>        factory = new ActiveMQConnectionFactory(CONNECTION_URL);
>>        BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(
>>                "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n")));
>>        Logger.getRootLogger().setLevel(Level.DEBUG);
>>    }
>>
>>    @Test(timeout = 10000)
>>    public void testAckDelivery() throws Exception {
>>        final Connection clientConnection = factory.createConnection();
>>        clientConnection.start();
>>        final Connection serverConnection = factory.createConnection();
>>        serverConnection.start();
>>
>>        final Session serverSession = serverConnection.createSession(false,
>>                Session.CLIENT_ACKNOWLEDGE);
>>        final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE);
>>        final MessageConsumer consumer = serverSession
>>                .createConsumer(serverQueue);
>>        consumer.setMessageListener(new MessageListener() {
>>
>>            public void onMessage(final Message message) {
>>                System.out.println("Got message");
>>                try {
>>                    System.out.println(message.getStringProperty(DATA));
>>                    // message.acknowledge();
>>                    successCount.incrementAndGet();
>>                } catch (final Exception e) {
>>                    e.printStackTrace();
>>                }
>>            }
>>
>>        });
>>        for (int i = 0; i < CLIENT_THREADS; i++) {
>>            final Thread t = new Thread(new Runnable() {
>>
>>                public void run() {
>>                    try {
>>                        final Session clientSession = clientConnection
>>                                .createSession(false,
>>                                        Session.CLIENT_ACKNOWLEDGE);
>>                        final Queue clientQueue = clientSession
>>                                .createQueue(SERVER_QUEUE);
>>                        final MessageProducer producer = clientSession
>>                                .createProducer(clientQueue);
>>                        final Message msg = clientSession.createMessage();
>>                        msg.setStringProperty(DATA, Thread.currentThread()
>>                                .getName());
>>                        System.err.println("Sending data "
>>                                + Thread.currentThread().getName());
>>                        producer.send(msg);
>>                    } catch (final Exception e) {
>>                        e.printStackTrace();
>>                    }
>>                }
>>            });
>>            t.start();
>>        }
>>        for (;;) {
>>            if (successCount.intValue() == CLIENT_THREADS)
>>                break;
>>            Thread.sleep(1000);
>>        }
>>        Assert.assertEquals(CLIENT_THREADS, successCount.intValue());
>>    }
>> }
>> ===================================================================================================================
>>
>> I expected the only one message will be processed by the consumer, so the rest
>> messages will never be delivered to it, but this test shows the messages are
>> delivered to the consumer and onMessage method is called exactly CLIENT_THREADS times,
>> which seems to be wrong.
>>
>> Does it mean the onMessage method will be executed with no matter of
>> acknowledgement is sent back, as soon as previous execution finishes?
>>
>> And what is the purpose of message.acknowledge() method?
>
> If you don't want the message broker to send another message to the
> consumer until it is acknowledged, set prefetch to 1
> http://activemq.apache.org/what-is-the-prefetch-limit-for.html
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>
> Open Source Integration
> http://fusesource.com/
>



-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT
is explicitly specified

Re: CLIENT_ACKNOWLEDGE mode for a session misunderstanding?

Posted by James Strachan <ja...@gmail.com>.
2008/10/28 Eugeny N Dzhurinsky <bo...@redwerk.com>:
> Hello!
>
> I am facing some strange issue with acknowledge mode in ActiveMQ. After
> reading the specs, I realized the broker will never deliver a message to the
> consumer unless the consumer acknowledges the previous message. So I thought
> it is enough to not send the acknowledgement message to the server in the
> onMessage method of the consumer and sent it later from another thread, which
> does it's job. This means I spawn a thread when onMessage is called and forget
> about it in this method.


You should only be using a JMS session from one thread at once - so
you should call message.acknowlege() from the consumer thread.



>
> However I've found this is not true. Below is my simple test case - I am
> starting the single consumer and several producers.
>
> ===================================================================================================================
> import javax.jms.Connection;
>
> public class TestAcknowledgeMode {
>
>    private static final String DATA = "data";
>
>    private static final String SERVER_QUEUE = "server_queue";
>
>    private static final String CONNECTION_URL = "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1";
>
>    private static final int CLIENT_THREADS = 5;
>
>    static ActiveMQConnectionFactory factory;
>
>    Semaphore semaphore = new Semaphore(CLIENT_THREADS, true);
>
>    AtomicInteger successCount = new AtomicInteger(0);
>
>    @BeforeClass
>    public static void initFactory() throws Exception {
>        factory = new ActiveMQConnectionFactory(CONNECTION_URL);
>        BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(
>                "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n")));
>        Logger.getRootLogger().setLevel(Level.DEBUG);
>    }
>
>    @Test(timeout = 10000)
>    public void testAckDelivery() throws Exception {
>        final Connection clientConnection = factory.createConnection();
>        clientConnection.start();
>        final Connection serverConnection = factory.createConnection();
>        serverConnection.start();
>
>        final Session serverSession = serverConnection.createSession(false,
>                Session.CLIENT_ACKNOWLEDGE);
>        final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE);
>        final MessageConsumer consumer = serverSession
>                .createConsumer(serverQueue);
>        consumer.setMessageListener(new MessageListener() {
>
>            public void onMessage(final Message message) {
>                System.out.println("Got message");
>                try {
>                    System.out.println(message.getStringProperty(DATA));
>                    // message.acknowledge();
>                    successCount.incrementAndGet();
>                } catch (final Exception e) {
>                    e.printStackTrace();
>                }
>            }
>
>        });
>        for (int i = 0; i < CLIENT_THREADS; i++) {
>            final Thread t = new Thread(new Runnable() {
>
>                public void run() {
>                    try {
>                        final Session clientSession = clientConnection
>                                .createSession(false,
>                                        Session.CLIENT_ACKNOWLEDGE);
>                        final Queue clientQueue = clientSession
>                                .createQueue(SERVER_QUEUE);
>                        final MessageProducer producer = clientSession
>                                .createProducer(clientQueue);
>                        final Message msg = clientSession.createMessage();
>                        msg.setStringProperty(DATA, Thread.currentThread()
>                                .getName());
>                        System.err.println("Sending data "
>                                + Thread.currentThread().getName());
>                        producer.send(msg);
>                    } catch (final Exception e) {
>                        e.printStackTrace();
>                    }
>                }
>            });
>            t.start();
>        }
>        for (;;) {
>            if (successCount.intValue() == CLIENT_THREADS)
>                break;
>            Thread.sleep(1000);
>        }
>        Assert.assertEquals(CLIENT_THREADS, successCount.intValue());
>    }
> }
> ===================================================================================================================
>
> I expected the only one message will be processed by the consumer, so the rest
> messages will never be delivered to it, but this test shows the messages are
> delivered to the consumer and onMessage method is called exactly CLIENT_THREADS times,
> which seems to be wrong.
>
> Does it mean the onMessage method will be executed with no matter of
> acknowledgement is sent back, as soon as previous execution finishes?
>
> And what is the purpose of message.acknowledge() method?

If you don't want the message broker to send another message to the
consumer until it is acknowledged, set prefetch to 1
http://activemq.apache.org/what-is-the-prefetch-limit-for.html

-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://fusesource.com/