You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Arthur Naseef (JIRA)" <ji...@apache.org> on 2014/03/18 23:20:48 UTC
[jira] [Assigned] (AMQ-5107) In-flight queue message redelivered to
multiple listeners upon broker shutdown
[ https://issues.apache.org/jira/browse/AMQ-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Arthur Naseef reassigned AMQ-5107:
----------------------------------
Assignee: Arthur Naseef
> In-flight queue message redelivered to multiple listeners upon broker shutdown
> ------------------------------------------------------------------------------
>
> Key: AMQ-5107
> URL: https://issues.apache.org/jira/browse/AMQ-5107
> Project: ActiveMQ
> Issue Type: Bug
> Components: Transport
> Affects Versions: 5.9.0
> Environment: Windows 7 64Bit - Java "1.6.0_20"
> CentOS 6.0 - Java "1.7.0_09-icedtea"
> Reporter: Greg Garlak
> Assignee: Arthur Naseef
> Fix For: NEEDS_REVIEW
>
>
> To reproduce:
> 1) Start 3 or more listener processes (see listener code below)
> 2) Run producer to push one message on queue (see producer code below)
> 3) One of the listeners will pick-up the message and sleep for one minute before auto acknowledging the message
> 4) Start a shutdown sequence of the broker within the 60 second window (Ctrl-C or issue Terminate jvm(int) command from Hawtio console)
> 5) All other idle listeners should get the same message redelivered simultaneously, each one having deliveryCount incremented
> Listener code:
> --------------
> package com.test;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestListener {
> public static void main(String[] args) {
> try {
> ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
> Connection connection = connectionFactory.createConnection();
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> Destination destination = session.createQueue("TEST.QUEUE");
> MessageConsumer consumer = session.createConsumer(destination);
>
> consumer.setMessageListener(new MessageListener() {
> public void onMessage(Message message) {
> try {
> TextMessage textMessage = (TextMessage) message;
> System.out.print("\nReceived " + textMessage.getText());
> System.out.print(", Redelivery: " + message.getJMSRedelivered());
> System.out.print(", Count: " + message.getLongProperty("JMSXDeliveryCount"));
> Thread.sleep(60000);
> System.out.print("... finished after sleep");
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> });
>
> connection.start();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> public TestListener() {
> super();
> }
> }
> Producer code:
> --------------
> package com.test;
> import java.util.Date;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestProducer {
> public static void main(String[] args) {
> try {
> thread(new HelloWorldProducer(), false);
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
> public static class HelloWorldProducer implements Runnable {
> public void run() {
> try {
> ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
> Connection connection = connectionFactory.createConnection();
> connection.start();
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> Destination destination = session.createQueue("TEST.QUEUE");
> MessageProducer producer = session.createProducer(destination);
> String text = "test message created on " + new Date();
> TextMessage message = session.createTextMessage(text);
> System.out.println("Sent " + text);
> producer.send(message);
> session.close();
> connection.close();
> }
> catch (Exception e) {
> e.printStackTrace();
> }
> }
> public HelloWorldProducer() {}
> }
> public static void thread(Runnable runnable, boolean daemon) {
> Thread brokerThread = new Thread(runnable);
> brokerThread.setDaemon(daemon);
> brokerThread.start();
> }
>
> public TestProducer() {
> super();
> }
> }
--
This message was sent by Atlassian JIRA
(v6.2#6252)