You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2011/07/12 21:55:00 UTC
[jira] [Closed] (AMQ-2648) Interrupting Consumer.close() thread
puts queue into unusable state
[ https://issues.apache.org/jira/browse/AMQ-2648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timothy Bish closed AMQ-2648.
-----------------------------
Resolution: Cannot Reproduce
Running the provided test the message is received and the connection is shut down when the receive call is interrupted, the program exists normally.
> Interrupting Consumer.close() thread puts queue into unusable state
> -------------------------------------------------------------------
>
> Key: AMQ-2648
> URL: https://issues.apache.org/jira/browse/AMQ-2648
> Project: ActiveMQ
> Issue Type: Bug
> Components: JMS client
> Affects Versions: 5.2.0
> Reporter: Parasoft Corporation
> Fix For: 5.6.0
>
>
> We have built a client program for sending and receiving JMS messages. Each send/receive operation is performed in a thread, so that we can handle timeouts properly. However, if the thread which is performing the receive() gets interrupted, the queue no longer responds to receive() requests, even from another client with a separate JVM.
> To reproduce, use two separate programs:
> -----------[QueueSendReceiveActiveMQInterrupt.java]--------
> import java.util.*;
> import javax.jms.*;
> import javax.naming.*;
> public class QueueSendReceiveActiveMQInterrupt implements MessageListener {
> public static void main(String[] args) throws Exception {
> useConnectionFactory();
> }
> private static void useConnectionFactory() throws Exception, JMSException {
> ConnectionFactory factory = getConnectionFactoryUsingJNDI();
> Connection connect = null;
> Session session = null;
> connect = factory.createConnection(/*"Admin", "Admin"*/);
> session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
> Destination entryDest = session.createQueue("soatest.demo.queue");
> Destination exitDest = entryDest;
> MessageProducer producer = session.createProducer(entryDest);
> producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
> MessageConsumer consumer = session.createConsumer(exitDest);
> connect.start();
> TextMessage txtMessage = session.createTextMessage();
> txtMessage.setJMSReplyTo(exitDest);
> txtMessage.setText("Hello 1 from standalone program!");
> producer.send(txtMessage);
> System.out.println("message 1 sent");
> Message msg;
>
> // threaded receive with a kill
> ReceiverRunner runner = (new QueueSendReceiveActiveMQInterrupt()).new ReceiverRunner(consumer);
> Thread t = new Thread(runner);
> t.setDaemon(true);
> t.start();
> t.join(1000);
> t.interrupt();
> msg = runner.getMessage();
> if (msg != null) {
> System.out.println("msg 1 received: " + ((TextMessage)msg).getText());
> } else {
> System.out.println("got no message 1");
> }
>
> producer.close();
> // consumer.close();
> // session.close();
> // connect.close();
> }
> private static ConnectionFactory getConnectionFactoryUsingJNDI() throws Exception {
> Object ret = null;
> Properties props = new Properties();
> props.put(javax.naming.Context.PROVIDER_URL, "tcp://skynet:61616");
> props.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
> InitialContext ictx = new javax.naming.InitialContext(props);
> Object obj = ictx.lookup("QueueConnectionFactory");
> if (obj instanceof Reference) {
> Reference ref = (Reference)obj;
> String className = ref.getClassName();
> System.out.println("Connection factory class name: " + className);
> Class cls = Class.forName(className);
> ret = cls.newInstance();
> } else {
> ret = obj;
> }
> ictx.close();
> return (ConnectionFactory)ret;
> }
> public void onMessage(Message msg) {
> if (msg != null) {
> try {
> System.out.println("msg = " + ((TextMessage)msg).getText());
> } catch (JMSException e) {
> e.printStackTrace();
> }
> } else {
> System.out.println("got nothing");
> }
> }
> public class ReceiverRunner implements Runnable {
> private MessageConsumer consumer;
> private Message msg;
>
> public ReceiverRunner(MessageConsumer consumer) {
> this.consumer = consumer;
> }
> public void run() {
> try {
> msg = consumer.receive(500);
> // change the following to a very small amount like 500 and notice how everything works
> consumer.receive(10000); // another receive just so it blocks and get the thread to stop
> } catch (JMSException e) {
> e.printStackTrace();
> } finally {
> try {
> consumer.close();
> } catch (JMSException e) {
> e.printStackTrace();
> }
> }
> }
> public Message getMessage() {
> return msg;
> }
> }
> }
> --
> -----------[QueueSendReceiveActiveMQ.java]--------
> import java.util.*;
> import javax.jms.*;
> import javax.naming.*;
> public class QueueSendReceiveActiveMQ implements MessageListener {
> public static void main(String[] args) throws Exception {
> useConnectionFactory();
> }
> private static void useConnectionFactory() throws Exception, JMSException {
> ConnectionFactory factory = getConnectionFactoryUsingJNDI();
> Connection connect = null;
> Session session = null;
> connect = factory.createConnection(/*"Admin", "Admin"*/);
> session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
> Destination entryDest = session.createQueue("soatest.demo.queue");
> Destination exitDest = entryDest;
> MessageProducer producer = session.createProducer(entryDest);
> producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
> MessageConsumer consumer = session.createConsumer(exitDest);
> connect.start();
> TextMessage txtMessage = session.createTextMessage();
> txtMessage.setJMSReplyTo(exitDest);
> txtMessage.setText("without thread interrupt: Hello 1 from standalone program!");
> producer.send(txtMessage);
> System.out.println("without thread interrupt: message 1 sent");
> Message msg;
> // regular receive
> msg = consumer.receive(2000);
>
> if (msg != null) {
> System.out.println("msg 1 received: " + ((TextMessage)msg).getText());
> } else {
> System.out.println("without thread interrupt: got no message 1");
> }
>
> producer.close();
> consumer.close();
> session.close();
> connect.close();
>
> }
> private static ConnectionFactory getConnectionFactoryUsingJNDI() throws Exception {
> Object ret = null;
> Properties props = new Properties();
> props.put(javax.naming.Context.PROVIDER_URL, "tcp://skynet:61616");
> props.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
> InitialContext ictx = new javax.naming.InitialContext(props);
> Object obj = ictx.lookup("QueueConnectionFactory");
> if (obj instanceof Reference) {
> Reference ref = (Reference)obj;
> String className = ref.getClassName();
> System.out.println("Connection factory class name: " + className);
> Class cls = Class.forName(className);
> ret = cls.newInstance();
> } else {
> ret = obj;
> }
> ictx.close();
> return (ConnectionFactory)ret;
> }
> public void onMessage(Message msg) {
> if (msg != null) {
> try {
> System.out.println("msg = " + ((TextMessage)msg).getText());
> } catch (JMSException e) {
> e.printStackTrace();
> }
> } else {
> System.out.println("got nothing");
> }
> }
> }
> --
> 1) Run QueueSendReceiveActiveMQ alone, notice how it works in sending receiving messages from the queue.
> 2) Run QueueSendReceiveActiveMQInterrupt will result in the program halting (due to some non-daemon thread created by ActiveMQ), then while it is running run QueueSendReceiveActiveMQ and notice how it fails to retrieve messages from the queue. If JMS Consumer.close() is excuted in a thread that is interrupted, it fails and throws an exception and leaves the consumer in some bad state.
> Note that the same code does not exhibit this behavior when using other vendors' MQ solutions.
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira