You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Bruce Snyder (JIRA)" <ji...@apache.org> on 2010/09/18 18:47:57 UTC

[jira] Updated: (AMQ-2648) Interrupting Consumer.close() thread puts queue into unusable state

     [ https://issues.apache.org/activemq/browse/AMQ-2648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bruce Snyder updated AMQ-2648:
------------------------------

    Fix Version/s: 5.5.0
                       (was: 5.4.1)

> Interrupting Consumer.close() thread puts queue into unusable state
> -------------------------------------------------------------------
>
>                 Key: AMQ-2648
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2648
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JMS client
>    Affects Versions: 5.2.0
>            Reporter: Parasoft Corporation
>             Fix For: 5.5.0
>
>
> We have built a client program for sending and receiving JMS messages. Each send/receive operation is performed in a thread, so that we can handle timeouts properly. However, if the thread which is performing the receive() gets interrupted, the queue no longer responds to receive() requests, even from another client with a separate JVM.
> To reproduce, use two separate programs:
> -----------[QueueSendReceiveActiveMQInterrupt.java]--------
> import java.util.*;
> import javax.jms.*;
> import javax.naming.*;
> public class QueueSendReceiveActiveMQInterrupt implements MessageListener {
>     public static void main(String[] args) throws Exception {
>         useConnectionFactory();
>     }
>     private static void useConnectionFactory() throws Exception, JMSException {
>         ConnectionFactory factory = getConnectionFactoryUsingJNDI();
>         Connection connect = null;
>         Session session = null;
>         connect = factory.createConnection(/*"Admin", "Admin"*/);
>         session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
>         Destination entryDest = session.createQueue("soatest.demo.queue");
>         Destination exitDest = entryDest;
>         MessageProducer producer = session.createProducer(entryDest);
>         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>         MessageConsumer consumer = session.createConsumer(exitDest);
>         connect.start();
>         TextMessage txtMessage = session.createTextMessage();
>         txtMessage.setJMSReplyTo(exitDest);
>         txtMessage.setText("Hello 1 from standalone program!");
>         producer.send(txtMessage);
>         System.out.println("message 1 sent");
>         Message msg;
>         
>         // threaded receive with a kill
>         ReceiverRunner runner = (new QueueSendReceiveActiveMQInterrupt()).new ReceiverRunner(consumer);
>         Thread t = new Thread(runner);
>         t.setDaemon(true);
>         t.start();
>         t.join(1000);
>         t.interrupt();
>         msg = runner.getMessage();
>         if (msg != null) {
>             System.out.println("msg 1 received: " + ((TextMessage)msg).getText());
>         } else {
>             System.out.println("got no message 1");
>         }
>         
>         producer.close();
> //        consumer.close();
> //        session.close();
> //        connect.close();
>     }
>     private static ConnectionFactory getConnectionFactoryUsingJNDI() throws Exception {
>         Object ret = null;
>         Properties props = new Properties();
>         props.put(javax.naming.Context.PROVIDER_URL, "tcp://skynet:61616");
>         props.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>         InitialContext ictx = new javax.naming.InitialContext(props);
>         Object obj = ictx.lookup("QueueConnectionFactory");
>         if (obj instanceof Reference) {
>             Reference ref = (Reference)obj;
>             String className = ref.getClassName();
>             System.out.println("Connection factory class name: " + className);
>             Class cls = Class.forName(className);
>             ret = cls.newInstance();
>         } else {
>             ret = obj;
>         }
>         ictx.close();
>         return (ConnectionFactory)ret;
>     }
>     public void onMessage(Message msg) {
>         if (msg != null) {
>             try {
>                 System.out.println("msg = " + ((TextMessage)msg).getText());
>             } catch (JMSException e) {
>                 e.printStackTrace();
>             }
>         } else {
>             System.out.println("got nothing");
>         }
>     }
>     public class ReceiverRunner implements Runnable {
>         private MessageConsumer consumer;
>         private Message msg;
>         
>         public ReceiverRunner(MessageConsumer consumer) {
>             this.consumer = consumer;
>         }
>         public void run() {
>             try {
>                 msg = consumer.receive(500);
>                 // change the following to a very small amount like 500 and notice how everything works
>                 consumer.receive(10000); // another receive just so it blocks and get the thread to stop
>             } catch (JMSException e) {
>                 e.printStackTrace();
>             } finally {
>                 try {
>                     consumer.close();
>                 } catch (JMSException e) {
>                     e.printStackTrace();
>                 }
>             }
>         }
>         public Message getMessage() {
>             return msg;
>         }
>     }
> }
> --
> -----------[QueueSendReceiveActiveMQ.java]--------
> import java.util.*;
> import javax.jms.*;
> import javax.naming.*;
> public class QueueSendReceiveActiveMQ implements MessageListener {
>     public static void main(String[] args) throws Exception {
>         useConnectionFactory();
>     }
>     private static void useConnectionFactory() throws Exception, JMSException {
>         ConnectionFactory factory = getConnectionFactoryUsingJNDI();
>         Connection connect = null;
>         Session session = null;
>         connect = factory.createConnection(/*"Admin", "Admin"*/);
>         session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
>         Destination entryDest = session.createQueue("soatest.demo.queue");
>         Destination exitDest = entryDest;
>         MessageProducer producer = session.createProducer(entryDest);
>         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>         MessageConsumer consumer = session.createConsumer(exitDest);
>         connect.start();
>         TextMessage txtMessage = session.createTextMessage();
>         txtMessage.setJMSReplyTo(exitDest);
>         txtMessage.setText("without thread interrupt: Hello 1 from standalone program!");
>         producer.send(txtMessage);
>         System.out.println("without thread interrupt: message 1 sent");
>         Message msg;
>         // regular receive
>         msg = consumer.receive(2000);
>         
>         if (msg != null) {
>             System.out.println("msg 1 received: " + ((TextMessage)msg).getText());
>         } else {
>             System.out.println("without thread interrupt: got no message 1");
>         }
>         
>         producer.close();
>         consumer.close();
>         session.close();
>         connect.close();
>         
>     }
>     private static ConnectionFactory getConnectionFactoryUsingJNDI() throws Exception {
>         Object ret = null;
>         Properties props = new Properties();
>         props.put(javax.naming.Context.PROVIDER_URL, "tcp://skynet:61616");
>         props.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>         InitialContext ictx = new javax.naming.InitialContext(props);
>         Object obj = ictx.lookup("QueueConnectionFactory");
>         if (obj instanceof Reference) {
>             Reference ref = (Reference)obj;
>             String className = ref.getClassName();
>             System.out.println("Connection factory class name: " + className);
>             Class cls = Class.forName(className);
>             ret = cls.newInstance();
>         } else {
>             ret = obj;
>         }
>         ictx.close();
>         return (ConnectionFactory)ret;
>     }
>     public void onMessage(Message msg) {
>         if (msg != null) {
>             try {
>                 System.out.println("msg = " + ((TextMessage)msg).getText());
>             } catch (JMSException e) {
>                 e.printStackTrace();
>             }
>         } else {
>             System.out.println("got nothing");
>         }
>     }
> }
> --
> 1) Run QueueSendReceiveActiveMQ alone, notice how it works in sending receiving messages from the queue.
> 2) Run QueueSendReceiveActiveMQInterrupt will result in the program halting (due to some non-daemon thread created by ActiveMQ), then while it is running run QueueSendReceiveActiveMQ and notice how it fails to retrieve messages from the queue. If JMS Consumer.close() is excuted in a thread that is interrupted, it fails and throws an exception and leaves the consumer in some bad state.
> Note that the same code does not exhibit this behavior when using other vendors' MQ solutions.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.