You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Steven Van Loon <sv...@invenso.com> on 2008/10/13 12:06:10 UTC

Concurrent use of connections

Hi all,

Is anybody aware of possible problems when (re)using a same Connection to activeMQ by different threads? According to the JMS specification, implementations should support concurrent use of Connections, so I created one connection to be used by all consumer / producers.

However, I experienced the problem that from a certain moment, no more messages were read from the queue although a lot of messages were still waiting. When another programs connects to the queue and reads a message, it can still fetch messages. We were able to track down the problem to activeMQ since using another queuing system solved the problem, it kept reading all the messages, no matter how many were waiting on the queue. Eventually, making a connection for each thread solved the problem also for activeMQ. Hence my question.

My apologies I can provide you only with a vague description of the problem and no test case to reproduce the problem but I'm wondering whether anybody else experienced similar problems and can provide me with more insight in the problem.

Thanks!
Steven.


RE: Concurrent use of connections

Posted by Steven Van Loon <sv...@invenso.com>.
I don't think so, I created the session already ThreadLocal since it is known to be thread-unsafe and the problem was still there..



-----Original Message-----
From: Joe Fernandez [mailto:joe.fernandez@ttmsolutions.com]
Sent: maandag 13 oktober 2008 13:06
To: users@activemq.apache.org
Subject: Re: Concurrent use of connections


Sessions are single-threaded. So could the issue be related more to your use
of sessions?

Joe
Get a free ActiveMQ user guide @ http://www.ttmsolutions.com


Steven Van Loon-2 wrote:
>
> Hi all,
>
> Is anybody aware of possible problems when (re)using a same Connection to
> activeMQ by different threads? According to the JMS specification,
> implementations should support concurrent use of Connections, so I created
> one connection to be used by all consumer / producers.
>
> However, I experienced the problem that from a certain moment, no more
> messages were read from the queue although a lot of messages were still
> waiting. When another programs connects to the queue and reads a message,
> it can still fetch messages. We were able to track down the problem to
> activeMQ since using another queuing system solved the problem, it kept
> reading all the messages, no matter how many were waiting on the queue.
> Eventually, making a connection for each thread solved the problem also
> for activeMQ. Hence my question.
>
> My apologies I can provide you only with a vague description of the
> problem and no test case to reproduce the problem but I'm wondering
> whether anybody else experienced similar problems and can provide me with
> more insight in the problem.
>
> Thanks!
> Steven.
>
>
>

--
View this message in context: http://www.nabble.com/Concurrent-use-of-connections-tp19952062p19952819.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


RE: Concurrent use of connections

Posted by Steven Van Loon <sv...@invenso.com>.
Thanks for the answer, will look at it.



-----Original Message-----
From: James Strachan [mailto:james.strachan@gmail.com]
Sent: dinsdag 14 oktober 2008 10:56
To: users@activemq.apache.org
Subject: Re: Concurrent use of connections

Please see the FAQ entry...
http://activemq.apache.org/how-do-i-use-jms-efficiently.html

you are not meant to create a consumer or producer for each message;
but reuse the same consumer

If you are new to JMS and have not yet grokked how to use it
efficiently, try using the JmsTemplate / MessageListenerContainer
abstractions in Spring along with the JMS pool in ActiveMQ
http://activemq.apache.org/jmstemplate-gotchas.html

Your problem could well be producer flow control kicking in BTW - but
see how things behave when you use JMS correctly

2008/10/14 Steven Van Loon <sv...@invenso.com>:
> Hi,
>
> I managed to reproduce the problem in the simple sample below. What I do is to create a connection that is shared between a consuming thread and a producing thread. The consumer is slower than the producer. When I run this sample, either the consumer stops consuming although lots of messages are available, or the producer stops producing.
>
> Anybody any ideas what's wrong or what I am doing wrong?
>
>
>
> package test.jms;
>
> import java.util.Hashtable;
>
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.QueueConnection;
> import javax.jms.QueueConnectionFactory;
> import javax.jms.QueueSession;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
>
> public class ConsumingProducer {
>
>        public final static String INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
>        public final static String PROVIDER_URL = "tcp://localhost:61616";
>
>        public final static String CONNECTION_FACTORY_NAME = "ConnectionFactory";
>        public final static String DESTINATION_NAME = "TEST.QUEUE";
>
>        public final static long WRITE_DELAY = 10;
>        public final static long READ_DELAY = 1000;
>
>        public static QueueConnectionFactory connectionFactory;
>        public static QueueConnection connection;
>
>        public static void initialize() throws NamingException, JMSException {
>                Hashtable<String, String> env = new Hashtable<String, String>();
>                env
>                                .put(InitialContext.INITIAL_CONTEXT_FACTORY,
>                                                INITIAL_CONTEXT_FACTORY);
>                env.put(InitialContext.PROVIDER_URL, PROVIDER_URL);
>                InitialContext initialContext = new InitialContext(env);
>
>                connectionFactory = (QueueConnectionFactory) initialContext
>                                .lookup(CONNECTION_FACTORY_NAME);
>
>                connection = connectionFactory.createQueueConnection();
>                connection.start();
>        }
>
>        public static void startConsumer() {
>
>                Thread t = new Thread() {
>                        public void run() {
>                                try {
>                                        QueueSession session = connection.createQueueSession(false,
>                                                        Session.AUTO_ACKNOWLEDGE);
>                                        Queue queue = session.createQueue(DESTINATION_NAME);
>                                        while (true) {
>                                                MessageConsumer consumer = session
>                                                                .createConsumer(queue);
>                                                Message msg = consumer.receive();
>                                                if (msg instanceof TextMessage) {
>                                                        System.out.println(((TextMessage) msg).getText());
>                                                } else if (msg != null) {
>                                                        System.out.println("Message received");
>                                                }
>                                                consumer.close();
>                                                sleep(READ_DELAY);
>                                        }
>                                } catch (Exception ex) {
>                                        ex.printStackTrace();
>                                }
>                        }
>                };
>                t.setName("Consumer");
>                t.start();
>        }
>
>        public static void startProducer() {
>
>                Thread t = new Thread() {
>                        public void run() {
>                                try {
>                                        QueueSession session = connection.createQueueSession(false,
>                                                        Session.AUTO_ACKNOWLEDGE);
>                                        Queue queue = session.createQueue(DESTINATION_NAME);
>                                        int msgctr = 1;
>                                        while (true) {
>                                                MessageProducer producer = session
>                                                                .createProducer(queue);
>                                                Message msg = session.createTextMessage("Message "
>                                                                + msgctr++);
>                                                producer.send(msg);
>                                                producer.close();
>
>                                                sleep(WRITE_DELAY);
>                                        }
>                                } catch (Exception ex) {
>                                        ex.printStackTrace();
>                                }
>                        }
>                };
>                t.setName("Producer");
>                t.start();
>        }
>
>        public static void main(String[] args) {
>                try {
>                        initialize();
>                        startConsumer();
>                        startProducer();
>                } catch (Exception ex) {
>                        ex.printStackTrace();
>                }
>        }
> }
>
>
>
>
> -----Original Message-----
> From: Joe Fernandez [mailto:joe.fernandez@ttmsolutions.com]
> Sent: maandag 13 oktober 2008 13:06
> To: users@activemq.apache.org
> Subject: Re: Concurrent use of connections
>
>
> Sessions are single-threaded. So could the issue be related more to your use
> of sessions?
>
> Joe
> Get a free ActiveMQ user guide @ http://www.ttmsolutions.com
>
>
> Steven Van Loon-2 wrote:
>>
>> Hi all,
>>
>> Is anybody aware of possible problems when (re)using a same Connection to
>> activeMQ by different threads? According to the JMS specification,
>> implementations should support concurrent use of Connections, so I created
>> one connection to be used by all consumer / producers.
>>
>> However, I experienced the problem that from a certain moment, no more
>> messages were read from the queue although a lot of messages were still
>> waiting. When another programs connects to the queue and reads a message,
>> it can still fetch messages. We were able to track down the problem to
>> activeMQ since using another queuing system solved the problem, it kept
>> reading all the messages, no matter how many were waiting on the queue.
>> Eventually, making a connection for each thread solved the problem also
>> for activeMQ. Hence my question.
>>
>> My apologies I can provide you only with a vague description of the
>> problem and no test case to reproduce the problem but I'm wondering
>> whether anybody else experienced similar problems and can provide me with
>> more insight in the problem.
>>
>> Thanks!
>> Steven.
>>
>>
>>
>
> --
> View this message in context: http://www.nabble.com/Concurrent-use-of-connections-tp19952062p19952819.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>



--
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://open.iona.com

Re: Concurrent use of connections

Posted by James Strachan <ja...@gmail.com>.
Please see the FAQ entry...
http://activemq.apache.org/how-do-i-use-jms-efficiently.html

you are not meant to create a consumer or producer for each message;
but reuse the same consumer

If you are new to JMS and have not yet grokked how to use it
efficiently, try using the JmsTemplate / MessageListenerContainer
abstractions in Spring along with the JMS pool in ActiveMQ
http://activemq.apache.org/jmstemplate-gotchas.html

Your problem could well be producer flow control kicking in BTW - but
see how things behave when you use JMS correctly

2008/10/14 Steven Van Loon <sv...@invenso.com>:
> Hi,
>
> I managed to reproduce the problem in the simple sample below. What I do is to create a connection that is shared between a consuming thread and a producing thread. The consumer is slower than the producer. When I run this sample, either the consumer stops consuming although lots of messages are available, or the producer stops producing.
>
> Anybody any ideas what's wrong or what I am doing wrong?
>
>
>
> package test.jms;
>
> import java.util.Hashtable;
>
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.QueueConnection;
> import javax.jms.QueueConnectionFactory;
> import javax.jms.QueueSession;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
>
> public class ConsumingProducer {
>
>        public final static String INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
>        public final static String PROVIDER_URL = "tcp://localhost:61616";
>
>        public final static String CONNECTION_FACTORY_NAME = "ConnectionFactory";
>        public final static String DESTINATION_NAME = "TEST.QUEUE";
>
>        public final static long WRITE_DELAY = 10;
>        public final static long READ_DELAY = 1000;
>
>        public static QueueConnectionFactory connectionFactory;
>        public static QueueConnection connection;
>
>        public static void initialize() throws NamingException, JMSException {
>                Hashtable<String, String> env = new Hashtable<String, String>();
>                env
>                                .put(InitialContext.INITIAL_CONTEXT_FACTORY,
>                                                INITIAL_CONTEXT_FACTORY);
>                env.put(InitialContext.PROVIDER_URL, PROVIDER_URL);
>                InitialContext initialContext = new InitialContext(env);
>
>                connectionFactory = (QueueConnectionFactory) initialContext
>                                .lookup(CONNECTION_FACTORY_NAME);
>
>                connection = connectionFactory.createQueueConnection();
>                connection.start();
>        }
>
>        public static void startConsumer() {
>
>                Thread t = new Thread() {
>                        public void run() {
>                                try {
>                                        QueueSession session = connection.createQueueSession(false,
>                                                        Session.AUTO_ACKNOWLEDGE);
>                                        Queue queue = session.createQueue(DESTINATION_NAME);
>                                        while (true) {
>                                                MessageConsumer consumer = session
>                                                                .createConsumer(queue);
>                                                Message msg = consumer.receive();
>                                                if (msg instanceof TextMessage) {
>                                                        System.out.println(((TextMessage) msg).getText());
>                                                } else if (msg != null) {
>                                                        System.out.println("Message received");
>                                                }
>                                                consumer.close();
>                                                sleep(READ_DELAY);
>                                        }
>                                } catch (Exception ex) {
>                                        ex.printStackTrace();
>                                }
>                        }
>                };
>                t.setName("Consumer");
>                t.start();
>        }
>
>        public static void startProducer() {
>
>                Thread t = new Thread() {
>                        public void run() {
>                                try {
>                                        QueueSession session = connection.createQueueSession(false,
>                                                        Session.AUTO_ACKNOWLEDGE);
>                                        Queue queue = session.createQueue(DESTINATION_NAME);
>                                        int msgctr = 1;
>                                        while (true) {
>                                                MessageProducer producer = session
>                                                                .createProducer(queue);
>                                                Message msg = session.createTextMessage("Message "
>                                                                + msgctr++);
>                                                producer.send(msg);
>                                                producer.close();
>
>                                                sleep(WRITE_DELAY);
>                                        }
>                                } catch (Exception ex) {
>                                        ex.printStackTrace();
>                                }
>                        }
>                };
>                t.setName("Producer");
>                t.start();
>        }
>
>        public static void main(String[] args) {
>                try {
>                        initialize();
>                        startConsumer();
>                        startProducer();
>                } catch (Exception ex) {
>                        ex.printStackTrace();
>                }
>        }
> }
>
>
>
>
> -----Original Message-----
> From: Joe Fernandez [mailto:joe.fernandez@ttmsolutions.com]
> Sent: maandag 13 oktober 2008 13:06
> To: users@activemq.apache.org
> Subject: Re: Concurrent use of connections
>
>
> Sessions are single-threaded. So could the issue be related more to your use
> of sessions?
>
> Joe
> Get a free ActiveMQ user guide @ http://www.ttmsolutions.com
>
>
> Steven Van Loon-2 wrote:
>>
>> Hi all,
>>
>> Is anybody aware of possible problems when (re)using a same Connection to
>> activeMQ by different threads? According to the JMS specification,
>> implementations should support concurrent use of Connections, so I created
>> one connection to be used by all consumer / producers.
>>
>> However, I experienced the problem that from a certain moment, no more
>> messages were read from the queue although a lot of messages were still
>> waiting. When another programs connects to the queue and reads a message,
>> it can still fetch messages. We were able to track down the problem to
>> activeMQ since using another queuing system solved the problem, it kept
>> reading all the messages, no matter how many were waiting on the queue.
>> Eventually, making a connection for each thread solved the problem also
>> for activeMQ. Hence my question.
>>
>> My apologies I can provide you only with a vague description of the
>> problem and no test case to reproduce the problem but I'm wondering
>> whether anybody else experienced similar problems and can provide me with
>> more insight in the problem.
>>
>> Thanks!
>> Steven.
>>
>>
>>
>
> --
> View this message in context: http://www.nabble.com/Concurrent-use-of-connections-tp19952062p19952819.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>



-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://open.iona.com

RE: Concurrent use of connections

Posted by Steven Van Loon <sv...@invenso.com>.
Hi,

I managed to reproduce the problem in the simple sample below. What I do is to create a connection that is shared between a consuming thread and a producing thread. The consumer is slower than the producer. When I run this sample, either the consumer stops consuming although lots of messages are available, or the producer stops producing.

Anybody any ideas what's wrong or what I am doing wrong?



package test.jms;

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class ConsumingProducer {

        public final static String INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
        public final static String PROVIDER_URL = "tcp://localhost:61616";

        public final static String CONNECTION_FACTORY_NAME = "ConnectionFactory";
        public final static String DESTINATION_NAME = "TEST.QUEUE";

        public final static long WRITE_DELAY = 10;
        public final static long READ_DELAY = 1000;

        public static QueueConnectionFactory connectionFactory;
        public static QueueConnection connection;

        public static void initialize() throws NamingException, JMSException {
                Hashtable<String, String> env = new Hashtable<String, String>();
                env
                                .put(InitialContext.INITIAL_CONTEXT_FACTORY,
                                                INITIAL_CONTEXT_FACTORY);
                env.put(InitialContext.PROVIDER_URL, PROVIDER_URL);
                InitialContext initialContext = new InitialContext(env);

                connectionFactory = (QueueConnectionFactory) initialContext
                                .lookup(CONNECTION_FACTORY_NAME);

                connection = connectionFactory.createQueueConnection();
                connection.start();
        }

        public static void startConsumer() {

                Thread t = new Thread() {
                        public void run() {
                                try {
                                        QueueSession session = connection.createQueueSession(false,
                                                        Session.AUTO_ACKNOWLEDGE);
                                        Queue queue = session.createQueue(DESTINATION_NAME);
                                        while (true) {
                                                MessageConsumer consumer = session
                                                                .createConsumer(queue);
                                                Message msg = consumer.receive();
                                                if (msg instanceof TextMessage) {
                                                        System.out.println(((TextMessage) msg).getText());
                                                } else if (msg != null) {
                                                        System.out.println("Message received");
                                                }
                                                consumer.close();
                                                sleep(READ_DELAY);
                                        }
                                } catch (Exception ex) {
                                        ex.printStackTrace();
                                }
                        }
                };
                t.setName("Consumer");
                t.start();
        }

        public static void startProducer() {

                Thread t = new Thread() {
                        public void run() {
                                try {
                                        QueueSession session = connection.createQueueSession(false,
                                                        Session.AUTO_ACKNOWLEDGE);
                                        Queue queue = session.createQueue(DESTINATION_NAME);
                                        int msgctr = 1;
                                        while (true) {
                                                MessageProducer producer = session
                                                                .createProducer(queue);
                                                Message msg = session.createTextMessage("Message "
                                                                + msgctr++);
                                                producer.send(msg);
                                                producer.close();

                                                sleep(WRITE_DELAY);
                                        }
                                } catch (Exception ex) {
                                        ex.printStackTrace();
                                }
                        }
                };
                t.setName("Producer");
                t.start();
        }

        public static void main(String[] args) {
                try {
                        initialize();
                        startConsumer();
                        startProducer();
                } catch (Exception ex) {
                        ex.printStackTrace();
                }
        }
}




-----Original Message-----
From: Joe Fernandez [mailto:joe.fernandez@ttmsolutions.com]
Sent: maandag 13 oktober 2008 13:06
To: users@activemq.apache.org
Subject: Re: Concurrent use of connections


Sessions are single-threaded. So could the issue be related more to your use
of sessions?

Joe
Get a free ActiveMQ user guide @ http://www.ttmsolutions.com


Steven Van Loon-2 wrote:
>
> Hi all,
>
> Is anybody aware of possible problems when (re)using a same Connection to
> activeMQ by different threads? According to the JMS specification,
> implementations should support concurrent use of Connections, so I created
> one connection to be used by all consumer / producers.
>
> However, I experienced the problem that from a certain moment, no more
> messages were read from the queue although a lot of messages were still
> waiting. When another programs connects to the queue and reads a message,
> it can still fetch messages. We were able to track down the problem to
> activeMQ since using another queuing system solved the problem, it kept
> reading all the messages, no matter how many were waiting on the queue.
> Eventually, making a connection for each thread solved the problem also
> for activeMQ. Hence my question.
>
> My apologies I can provide you only with a vague description of the
> problem and no test case to reproduce the problem but I'm wondering
> whether anybody else experienced similar problems and can provide me with
> more insight in the problem.
>
> Thanks!
> Steven.
>
>
>

--
View this message in context: http://www.nabble.com/Concurrent-use-of-connections-tp19952062p19952819.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Concurrent use of connections

Posted by Joe Fernandez <jo...@ttmsolutions.com>.
Sessions are single-threaded. So could the issue be related more to your use
of sessions?

Joe
Get a free ActiveMQ user guide @ http://www.ttmsolutions.com


Steven Van Loon-2 wrote:
> 
> Hi all,
> 
> Is anybody aware of possible problems when (re)using a same Connection to
> activeMQ by different threads? According to the JMS specification,
> implementations should support concurrent use of Connections, so I created
> one connection to be used by all consumer / producers.
> 
> However, I experienced the problem that from a certain moment, no more
> messages were read from the queue although a lot of messages were still
> waiting. When another programs connects to the queue and reads a message,
> it can still fetch messages. We were able to track down the problem to
> activeMQ since using another queuing system solved the problem, it kept
> reading all the messages, no matter how many were waiting on the queue.
> Eventually, making a connection for each thread solved the problem also
> for activeMQ. Hence my question.
> 
> My apologies I can provide you only with a vague description of the
> problem and no test case to reproduce the problem but I'm wondering
> whether anybody else experienced similar problems and can provide me with
> more insight in the problem.
> 
> Thanks!
> Steven.
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/Concurrent-use-of-connections-tp19952062p19952819.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.