You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Arthur Naseef (JIRA)" <ji...@apache.org> on 2014/03/18 23:20:48 UTC

[jira] [Assigned] (AMQ-5107) In-flight queue message redelivered to multiple listeners upon broker shutdown

     [ https://issues.apache.org/jira/browse/AMQ-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Arthur Naseef reassigned AMQ-5107:
----------------------------------

    Assignee: Arthur Naseef

> In-flight queue message redelivered to multiple listeners upon broker shutdown
> ------------------------------------------------------------------------------
>
>                 Key: AMQ-5107
>                 URL: https://issues.apache.org/jira/browse/AMQ-5107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.9.0
>         Environment: Windows 7 64Bit - Java "1.6.0_20"
> CentOS 6.0 - Java "1.7.0_09-icedtea" 
>            Reporter: Greg Garlak
>            Assignee: Arthur Naseef
>             Fix For: NEEDS_REVIEW
>
>
> To reproduce: 
> 1) Start 3 or more listener processes (see listener code below)
> 2) Run producer to push one message on queue (see producer code below)
> 3) One of the listeners will pick-up the message and sleep for one minute before auto acknowledging the message
> 4) Start a shutdown sequence of the broker within the 60 second window (Ctrl-C or issue Terminate jvm(int) command from Hawtio console) 
> 5) All other idle listeners should get the same message redelivered simultaneously, each one having deliveryCount incremented 
> Listener code:
> --------------
> package com.test;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestListener {
> 	public static void main(String[] args) {
> 		try {	
> 			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
> 			Connection connection = connectionFactory.createConnection();
> 			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> 			Destination destination = session.createQueue("TEST.QUEUE");
> 			MessageConsumer consumer = session.createConsumer(destination);
> 			
> 			consumer.setMessageListener(new MessageListener() {
> 				public void onMessage(Message message) {
> 					try	{
> 						TextMessage textMessage = (TextMessage) message;
> 						System.out.print("\nReceived " + textMessage.getText());
> 						System.out.print(", Redelivery: " + message.getJMSRedelivered());
> 						System.out.print(", Count: " + message.getLongProperty("JMSXDeliveryCount"));
> 						Thread.sleep(60000);			
> 						System.out.print("... finished after sleep");
> 					} catch (Exception e) {
> 						e.printStackTrace();
> 					}
> 				}
> 			});
> 			
> 			connection.start();
> 		} catch (Exception e) {
> 			e.printStackTrace();
> 		}
> 	}
> 	public TestListener() {
> 		super();
> 	}
> }
> Producer code:
> --------------
> package com.test;
> import java.util.Date;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestProducer {
> 	public static void main(String[] args) {
> 		try {
> 			thread(new HelloWorldProducer(), false);
> 		} catch (Exception e) {
> 			e.printStackTrace();
> 		}
> 	}
>  
> 	public static class HelloWorldProducer implements Runnable {
> 		public void run() {
> 			try {
> 				ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
> 				Connection connection = connectionFactory.createConnection();
> 				connection.start();
> 				Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> 				Destination destination = session.createQueue("TEST.QUEUE");
> 				MessageProducer producer = session.createProducer(destination);
> 				String text = "test message created on " + new Date();
> 				TextMessage message = session.createTextMessage(text);
> 				System.out.println("Sent " + text);
> 				producer.send(message);
> 				session.close();
> 				connection.close();
> 			}
> 			catch (Exception e) {
> 				e.printStackTrace();
> 			}
> 		}
> 		public HelloWorldProducer() {}
> 	}
> 	public static void thread(Runnable runnable, boolean daemon) {
> 		Thread brokerThread = new Thread(runnable);
> 		brokerThread.setDaemon(daemon);
> 		brokerThread.start();
> 	}
>     
> 	public TestProducer() {
> 		super();
> 	}
> }



--
This message was sent by Atlassian JIRA
(v6.2#6252)