You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by Marjan <st...@mt.net.mk> on 2008/07/09 08:26:05 UTC

ActiveMQ message delivery

I'm testing ActiveMQ 5.1 and I found a strange behaviour. There is a single
message producer firing 10 messages in 1 second and message consumer that
listens on the same destination. Here is the code: 

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.MessageProducer; 
import javax.jms.ObjectMessage; 
import javax.jms.Session; 

import org.apache.activemq.ActiveMQConnectionFactory; 

/** 
 * @author Marjan Sterjev 
 * 
 */ 
public class JmsProducer { 

        public static void main(String[] args) throws Throwable { 
                ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory( 
                                "tcp://localhost:61616"); 
                Connection connection =
connectionFactory.createConnection(); 
                Session session = connection.createSession(false, 
                                Session.AUTO_ACKNOWLEDGE); 
                Destination destination = session.createQueue("test.queue"); 
                MessageProducer producer =
session.createProducer(destination); 
                producer.setDeliveryMode(DeliveryMode.PERSISTENT); 
                producer.setTimeToLive(10000); 
                int msgCount = 10; 
                int sleep = 100; 
                String payload = "Test Message"; 
                for (int i = 0; i < msgCount; i++) { 
                        ObjectMessage message =
session.createObjectMessage(); 
                        String p = String.format("%s:%d", payload, (i + 1)); 
                        message.setObject(p); 
                        producer.send(message); 
                        Thread.sleep(sleep); 
                } 
                producer.close(); 
                session.close(); 
                connection.close(); 
        } 
} 

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.ObjectMessage; 
import javax.jms.Session; 

import org.apache.activemq.ActiveMQConnectionFactory; 

/** 
 * @author Marjan Sterjev 
 * 
 */ 
public class JmsConsumer implements MessageListener { 

        private int messagesReceived; 

        public JmsConsumer() throws Throwable { 
                ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory( 
                                "tcp://localhost:61616"); 
                Connection connection =
connectionFactory.createConnection(); 
                connection.start(); 
                Session session = connection.createSession(false, 
                                Session.AUTO_ACKNOWLEDGE); 
                Destination destination = session.createQueue("test.queue"); 
                MessageConsumer consumer =
session.createConsumer(destination); 
                consumer.setMessageListener(this); 
        } 

        public void onMessage(Message message) { 
                ObjectMessage objectMessage = (ObjectMessage) message; 
                try { 
                        System.out.println(String.format("[%d]:%s",
(++messagesReceived), 
                                       
objectMessage.getObject().toString())); 
                } catch (JMSException e) { 
                        e.printStackTrace(); 
                } 
        } 
        
        public static void main(String[] args)throws Throwable{ 
                new JmsConsumer(); 
        } 

} 

I'm using the default persistence adapter: 


<persistenceAdapter> 
            <amqPersistenceAdapter syncOnWrite="false"
directory="${activemq.base}/data" maxFileLength="20 mb"/> 
 </persistenceAdapter> 

Note that message expiration time is set to 10 seconds. 

If the message consumer is up all the time there is no problem, all messages
are received promtly. I see a problem if message consumer is down and some
of the messages expire. In that case the message store behaves strangely.
When the consumer is started again and we run the producer multiple times
there are cases when the last message is not delivered to the consumer. I
found that the message will be delivered if additional messages are sent to
the queue or message consumer is restarted. This behaviour occurs randomly. 

In order to reproduce the scenario follow these steps: 

1. Start the consumer 
2. Run producer multiple times. All messages will be delivered as expected. 
3. Stop the consumer 
4. Run producer and wait more than 10 seconds after that. After 10 seconds
the messages will expire. 
5. Start the consumer 
6. Run producer multiple times (do not give up if everything is ok after
several runnings). You should note that the last message (10-th) is not
delivered on each producer burst. 

I was monitoring the WEB administration console. With such corrupted store I
see sometimes negative numbers as number of pending messages which is a
clear bug in the application. 

If I use syncOnWrite="true" on amqPersistenceAdapter, the problem will go
away. The problem does not exist with jdbcPersistenceAdapter. 

My operating system is Windows XP. 

Any comments? 

Regards

-- 
View this message in context: http://www.nabble.com/ActiveMQ-message-delivery-tp18355245p18355245.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Re: ActiveMQ message delivery

Posted by Rob Davies <ra...@gmail.com>.
Great! - thanks for letting us know

On 10 Jul 2008, at 07:34, Marjan wrote:

>
> Yes, it works now.
>
> Thanks
>
>
> rajdavies wrote:
>>
>> that problem with the class cast exception was fixed today - might be
>> worth looking tomorrow
>>
>> On 9 Jul 2008, at 14:07, Marjan wrote:
>>
>>>
>>> I tried the 5.2 SNAPSHOT. By the way, spring-aop.jar is missing from
>>> the
>>> distribution. I added it by myself. With this version the things are
>>> even
>>> worse. If some message expires, the consumer can't read and the
>>> producer
>>> can't produce messages. This is the stack trace on the server:
>>>
>>> ERROR Queue                          - Failed to page in more queue
>>> messages
>>> java.lang.ClassCastException:
>>> org.apache.activemq.command.ActiveMQObjectMessage
>>>       at
>>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java: 
>>> 104
>>> 5)
>>>       at
>>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java: 
>>> 103
>>> 7)
>>>       at
>>> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
>>>       at
>>> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java: 
>>> 119
>>> 6)
>>>       at org.apache.activemq.broker.region.Queue.iterate(Queue.java:
>>> 951)
>>>       at
>>> org 
>>> .apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
>>> ticTaskRunner.java:84)
>>>       at
>>> org.apache.activemq.thread.DeterministicTaskRunner 
>>> $1.run(Deterministi
>>> cTaskRunner.java:41)
>>>       at
>>> java.util.concurrent.ThreadPoolExecutor 
>>> $Worker.runTask(ThreadPoolExec
>>> utor.java:650)
>>>       at
>>> java.util.concurrent.ThreadPoolExecutor 
>>> $Worker.run(ThreadPoolExecutor
>>> .java:675)
>>>       at java.lang.Thread.run(Thread.java:595)
>>> ERROR Queue                          - Failed to page in more queue
>>> messages
>>> java.lang.ClassCastException:
>>> org.apache.activemq.command.ActiveMQObjectMessage
>>>       at
>>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java: 
>>> 104
>>> 5)
>>>       at
>>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java: 
>>> 103
>>> 7)
>>>       at
>>> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
>>>       at
>>> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java: 
>>> 119
>>> 6)
>>>       at org.apache.activemq.broker.region.Queue.iterate(Queue.java:
>>> 951)
>>>       at
>>> org 
>>> .apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
>>> ticTaskRunner.java:84)
>>>       at
>>> org.apache.activemq.thread.DeterministicTaskRunner 
>>> $1.run(Deterministi
>>> cTaskRunner.java:41)
>>>       at
>>> java.util.concurrent.ThreadPoolExecutor 
>>> $Worker.runTask(ThreadPoolExec
>>> utor.java:650)
>>>       at
>>> java.util.concurrent.ThreadPoolExecutor 
>>> $Worker.run(ThreadPoolExecutor
>>> .java:675)
>>>       at java.lang.Thread.run(Thread.java:595)
>>> ERROR Queue                          - Failed to page in more queue
>>> messages
>>> java.lang.ClassCastException:
>>> org.apache.activemq.command.ActiveMQObjectMessage
>>>       at
>>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java: 
>>> 104
>>> 5)
>>>       at
>>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java: 
>>> 103
>>> 7)
>>>       at
>>> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
>>>       at
>>> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java: 
>>> 119
>>> 6)
>>>       at org.apache.activemq.broker.region.Queue.iterate(Queue.java:
>>> 951)
>>>       at
>>> org 
>>> .apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
>>> ticTaskRunner.java:84)
>>>       at
>>> org.apache.activemq.thread.DeterministicTaskRunner 
>>> $1.run(Deterministi
>>> cTaskRunner.java:41)
>>>       at
>>> java.util.concurrent.ThreadPoolExecutor 
>>> $Worker.runTask(ThreadPoolExec
>>> utor.java:650)
>>>       at
>>> java.util.concurrent.ThreadPoolExecutor 
>>> $Worker.run(ThreadPoolExecutor
>>> .java:675)
>>>       at java.lang.Thread.run(Thread.java:595)
>>>
>>> Can we expect that this will be fixed in the near future? It is not
>>> trivial
>>> and message expiration mechanism is one of the basic JMS principles.
>>>
>>> Regards
>>> -- 
>>> View this message in context:
>>> http://www.nabble.com/ActiveMQ-message-delivery-tp18355245p18361191.html
>>> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>>>
>>
>>
>>
>
> -- 
> View this message in context: http://www.nabble.com/ActiveMQ-message-delivery-tp18355245p18376799.html
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>


Re: ActiveMQ message delivery

Posted by Marjan <st...@mt.net.mk>.
Yes, it works now.

Thanks


rajdavies wrote:
> 
> that problem with the class cast exception was fixed today - might be  
> worth looking tomorrow
> 
> On 9 Jul 2008, at 14:07, Marjan wrote:
> 
>>
>> I tried the 5.2 SNAPSHOT. By the way, spring-aop.jar is missing from  
>> the
>> distribution. I added it by myself. With this version the things are  
>> even
>> worse. If some message expires, the consumer can't read and the  
>> producer
>> can't produce messages. This is the stack trace on the server:
>>
>> ERROR Queue                          - Failed to page in more queue  
>> messages
>> java.lang.ClassCastException:
>> org.apache.activemq.command.ActiveMQObjectMessage
>>        at
>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:104
>> 5)
>>        at
>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:103
>> 7)
>>        at  
>> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
>>        at
>> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:119
>> 6)
>>        at org.apache.activemq.broker.region.Queue.iterate(Queue.java: 
>> 951)
>>        at
>> org.apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
>> ticTaskRunner.java:84)
>>        at
>> org.apache.activemq.thread.DeterministicTaskRunner$1.run(Deterministi
>> cTaskRunner.java:41)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
>> utor.java:650)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
>> .java:675)
>>        at java.lang.Thread.run(Thread.java:595)
>> ERROR Queue                          - Failed to page in more queue  
>> messages
>> java.lang.ClassCastException:
>> org.apache.activemq.command.ActiveMQObjectMessage
>>        at
>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:104
>> 5)
>>        at
>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:103
>> 7)
>>        at  
>> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
>>        at
>> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:119
>> 6)
>>        at org.apache.activemq.broker.region.Queue.iterate(Queue.java: 
>> 951)
>>        at
>> org.apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
>> ticTaskRunner.java:84)
>>        at
>> org.apache.activemq.thread.DeterministicTaskRunner$1.run(Deterministi
>> cTaskRunner.java:41)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
>> utor.java:650)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
>> .java:675)
>>        at java.lang.Thread.run(Thread.java:595)
>> ERROR Queue                          - Failed to page in more queue  
>> messages
>> java.lang.ClassCastException:
>> org.apache.activemq.command.ActiveMQObjectMessage
>>        at
>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:104
>> 5)
>>        at
>> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:103
>> 7)
>>        at  
>> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
>>        at
>> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:119
>> 6)
>>        at org.apache.activemq.broker.region.Queue.iterate(Queue.java: 
>> 951)
>>        at
>> org.apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
>> ticTaskRunner.java:84)
>>        at
>> org.apache.activemq.thread.DeterministicTaskRunner$1.run(Deterministi
>> cTaskRunner.java:41)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
>> utor.java:650)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
>> .java:675)
>>        at java.lang.Thread.run(Thread.java:595)
>>
>> Can we expect that this will be fixed in the near future? It is not  
>> trivial
>> and message expiration mechanism is one of the basic JMS principles.
>>
>> Regards
>> -- 
>> View this message in context:
>> http://www.nabble.com/ActiveMQ-message-delivery-tp18355245p18361191.html
>> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>>
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/ActiveMQ-message-delivery-tp18355245p18376799.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Re: ActiveMQ message delivery

Posted by Rob Davies <ra...@gmail.com>.
that problem with the class cast exception was fixed today - might be  
worth looking tomorrow

On 9 Jul 2008, at 14:07, Marjan wrote:

>
> I tried the 5.2 SNAPSHOT. By the way, spring-aop.jar is missing from  
> the
> distribution. I added it by myself. With this version the things are  
> even
> worse. If some message expires, the consumer can't read and the  
> producer
> can't produce messages. This is the stack trace on the server:
>
> ERROR Queue                          - Failed to page in more queue  
> messages
> java.lang.ClassCastException:
> org.apache.activemq.command.ActiveMQObjectMessage
>        at
> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:104
> 5)
>        at
> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:103
> 7)
>        at  
> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
>        at
> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:119
> 6)
>        at org.apache.activemq.broker.region.Queue.iterate(Queue.java: 
> 951)
>        at
> org.apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
> ticTaskRunner.java:84)
>        at
> org.apache.activemq.thread.DeterministicTaskRunner$1.run(Deterministi
> cTaskRunner.java:41)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
> utor.java:650)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:675)
>        at java.lang.Thread.run(Thread.java:595)
> ERROR Queue                          - Failed to page in more queue  
> messages
> java.lang.ClassCastException:
> org.apache.activemq.command.ActiveMQObjectMessage
>        at
> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:104
> 5)
>        at
> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:103
> 7)
>        at  
> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
>        at
> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:119
> 6)
>        at org.apache.activemq.broker.region.Queue.iterate(Queue.java: 
> 951)
>        at
> org.apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
> ticTaskRunner.java:84)
>        at
> org.apache.activemq.thread.DeterministicTaskRunner$1.run(Deterministi
> cTaskRunner.java:41)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
> utor.java:650)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:675)
>        at java.lang.Thread.run(Thread.java:595)
> ERROR Queue                          - Failed to page in more queue  
> messages
> java.lang.ClassCastException:
> org.apache.activemq.command.ActiveMQObjectMessage
>        at
> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:104
> 5)
>        at
> org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:103
> 7)
>        at  
> org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
>        at
> org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:119
> 6)
>        at org.apache.activemq.broker.region.Queue.iterate(Queue.java: 
> 951)
>        at
> org.apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
> ticTaskRunner.java:84)
>        at
> org.apache.activemq.thread.DeterministicTaskRunner$1.run(Deterministi
> cTaskRunner.java:41)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
> utor.java:650)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:675)
>        at java.lang.Thread.run(Thread.java:595)
>
> Can we expect that this will be fixed in the near future? It is not  
> trivial
> and message expiration mechanism is one of the basic JMS principles.
>
> Regards
> -- 
> View this message in context: http://www.nabble.com/ActiveMQ-message-delivery-tp18355245p18361191.html
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>


Re: ActiveMQ message delivery

Posted by Marjan <st...@mt.net.mk>.
I tried the 5.2 SNAPSHOT. By the way, spring-aop.jar is missing from the
distribution. I added it by myself. With this version the things are even
worse. If some message expires, the consumer can't read and the producer
can't produce messages. This is the stack trace on the server:

ERROR Queue                          - Failed to page in more queue messages
java.lang.ClassCastException:
org.apache.activemq.command.ActiveMQObjectMessage
        at
org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:104
5)
        at
org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:103
7)
        at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
        at
org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:119
6)
        at org.apache.activemq.broker.region.Queue.iterate(Queue.java:951)
        at
org.apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
ticTaskRunner.java:84)
        at
org.apache.activemq.thread.DeterministicTaskRunner$1.run(Deterministi
cTaskRunner.java:41)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
utor.java:650)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:675)
        at java.lang.Thread.run(Thread.java:595)
ERROR Queue                          - Failed to page in more queue messages
java.lang.ClassCastException:
org.apache.activemq.command.ActiveMQObjectMessage
        at
org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:104
5)
        at
org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:103
7)
        at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
        at
org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:119
6)
        at org.apache.activemq.broker.region.Queue.iterate(Queue.java:951)
        at
org.apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
ticTaskRunner.java:84)
        at
org.apache.activemq.thread.DeterministicTaskRunner$1.run(Deterministi
cTaskRunner.java:41)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
utor.java:650)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:675)
        at java.lang.Thread.run(Thread.java:595)
ERROR Queue                          - Failed to page in more queue messages
java.lang.ClassCastException:
org.apache.activemq.command.ActiveMQObjectMessage
        at
org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:104
5)
        at
org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:103
7)
        at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1117)
        at
org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:119
6)
        at org.apache.activemq.broker.region.Queue.iterate(Queue.java:951)
        at
org.apache.activemq.thread.DeterministicTaskRunner.runTask(Determinis
ticTaskRunner.java:84)
        at
org.apache.activemq.thread.DeterministicTaskRunner$1.run(Deterministi
cTaskRunner.java:41)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
utor.java:650)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:675)
        at java.lang.Thread.run(Thread.java:595)

Can we expect that this will be fixed in the near future? It is not trivial
and message expiration mechanism is one of the basic JMS principles.

Regards
-- 
View this message in context: http://www.nabble.com/ActiveMQ-message-delivery-tp18355245p18361191.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Re: ActiveMQ message delivery

Posted by Rob Davies <ra...@gmail.com>.
Hi Marjan,

could you try with the latest 5.2-snapshot - I think this has been  
resolved recently.
Let me know how you get on!

cheers,

Rob

On 9 Jul 2008, at 07:26, Marjan wrote:

>
> I'm testing ActiveMQ 5.1 and I found a strange behaviour. There is a  
> single
> message producer firing 10 messages in 1 second and message consumer  
> that
> listens on the same destination. Here is the code:
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.DeliveryMode;
> import javax.jms.Destination;
> import javax.jms.MessageProducer;
> import javax.jms.ObjectMessage;
> import javax.jms.Session;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
> /**
> * @author Marjan Sterjev
> *
> */
> public class JmsProducer {
>
>        public static void main(String[] args) throws Throwable {
>                ConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(
>                                "tcp://localhost:61616");
>                Connection connection =
> connectionFactory.createConnection();
>                Session session = connection.createSession(false,
>                                Session.AUTO_ACKNOWLEDGE);
>                Destination destination =  
> session.createQueue("test.queue");
>                MessageProducer producer =
> session.createProducer(destination);
>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>                producer.setTimeToLive(10000);
>                int msgCount = 10;
>                int sleep = 100;
>                String payload = "Test Message";
>                for (int i = 0; i < msgCount; i++) {
>                        ObjectMessage message =
> session.createObjectMessage();
>                        String p = String.format("%s:%d", payload, (i  
> + 1));
>                        message.setObject(p);
>                        producer.send(message);
>                        Thread.sleep(sleep);
>                }
>                producer.close();
>                session.close();
>                connection.close();
>        }
> }
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.ObjectMessage;
> import javax.jms.Session;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
> /**
> * @author Marjan Sterjev
> *
> */
> public class JmsConsumer implements MessageListener {
>
>        private int messagesReceived;
>
>        public JmsConsumer() throws Throwable {
>                ConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(
>                                "tcp://localhost:61616");
>                Connection connection =
> connectionFactory.createConnection();
>                connection.start();
>                Session session = connection.createSession(false,
>                                Session.AUTO_ACKNOWLEDGE);
>                Destination destination =  
> session.createQueue("test.queue");
>                MessageConsumer consumer =
> session.createConsumer(destination);
>                consumer.setMessageListener(this);
>        }
>
>        public void onMessage(Message message) {
>                ObjectMessage objectMessage = (ObjectMessage) message;
>                try {
>                        System.out.println(String.format("[%d]:%s",
> (++messagesReceived),
>
> objectMessage.getObject().toString()));
>                } catch (JMSException e) {
>                        e.printStackTrace();
>                }
>        }
>
>        public static void main(String[] args)throws Throwable{
>                new JmsConsumer();
>        }
>
> }
>
> I'm using the default persistence adapter:
>
>
> <persistenceAdapter>
>            <amqPersistenceAdapter syncOnWrite="false"
> directory="${activemq.base}/data" maxFileLength="20 mb"/>
> </persistenceAdapter>
>
> Note that message expiration time is set to 10 seconds.
>
> If the message consumer is up all the time there is no problem, all  
> messages
> are received promtly. I see a problem if message consumer is down  
> and some
> of the messages expire. In that case the message store behaves  
> strangely.
> When the consumer is started again and we run the producer multiple  
> times
> there are cases when the last message is not delivered to the  
> consumer. I
> found that the message will be delivered if additional messages are  
> sent to
> the queue or message consumer is restarted. This behaviour occurs  
> randomly.
>
> In order to reproduce the scenario follow these steps:
>
> 1. Start the consumer
> 2. Run producer multiple times. All messages will be delivered as  
> expected.
> 3. Stop the consumer
> 4. Run producer and wait more than 10 seconds after that. After 10  
> seconds
> the messages will expire.
> 5. Start the consumer
> 6. Run producer multiple times (do not give up if everything is ok  
> after
> several runnings). You should note that the last message (10-th) is  
> not
> delivered on each producer burst.
>
> I was monitoring the WEB administration console. With such corrupted  
> store I
> see sometimes negative numbers as number of pending messages which  
> is a
> clear bug in the application.
>
> If I use syncOnWrite="true" on amqPersistenceAdapter, the problem  
> will go
> away. The problem does not exist with jdbcPersistenceAdapter.
>
> My operating system is Windows XP.
>
> Any comments?
>
> Regards
>
> -- 
> View this message in context: http://www.nabble.com/ActiveMQ-message-delivery-tp18355245p18355245.html
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>