You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Albert Strasheim <13...@sun.ac.za> on 2007/01/31 13:02:25 UTC
Client getting stuck when stopping connection from message listener
Hello all
We've run into a possible issue when stopping a connection from a
message listener. Our application listens for a "start" message, upon
which it does the following in a message listener:
1. It stops the connection
2. It creates another session from the connection
3. It constructs one of our objects, passing it the session
4. The object uses the session to create a few more consumers
5. When the constructor returns, the connection is started again
Our object looks something like this:
public final class MultipleListener {
private int state;
private class Listener1 implements MessageListener {
public void onMessage(Message message) {
state += 1;
}
}
private class Listener2 implements MessageListener {
public void onMessage(Message message) {
state += 2;
}
}
public MultipleListener(Session session, Destination destination) throws JMSException {
MessageConsumer consumer1 = session.createConsumer(destination);
consumer1.setMessageListener(new Listener1());
MessageConsumer consumer2 = session.createConsumer(destination);
consumer2.setMessageListener(new Listener2());
}
}
Just to check that I read my JMS spec correctly: since both consumers
are created from the same session, we don't have to synchronize access
to MultipleListener's state since only one thread from the messaging
system will call into the message listeners at any given time. Is this
right?
And we're doing something like this to create MultipleListeners:
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(...);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new StartListener(connection));
connection.start();
With StartListener something like:
class StartListener implements MessageListener {
private Connection connection;
public StartListener(Connection connection) {
this.connection = connection;
}
public void onMessage(Message message) {
connection.stop();
new MultipleListener(connection.createSession(...));
connection.start();
}
}
The reason I'm stopping the connection prior to making the
MultipleListener is that I don't want the message listeners to be
called until the constructor is done. The thinking here is that all
MultipleListener's internal state might not be ready to receive messages
until the constructor has returned.
Is this a valid concern? Is there a way to deal with this without
stopping the connection?
We'd like each MultipleListener to have its own session so that they
can process messages in parallel, so plugging them in on the same
session as StartListener isn't really what we want to do.
We could make this single session work by putting in a queue and having
a thread we create pass messages on to the MultipleListeners, but I'm
hoping we can get away using only ActiveMQ's Session tasks (or is this
an abuse of the session threads?). If we shouldn't be using the session
threads to drive our components, is there a nice example somewhere of
a good alternative (probably some kind of thread pool)?
With all that said, we're seeing some problems when the message listener
stops the connection.
The threads end up looking like this:
Name: ActiveMQ Transport: tcp://localhost/192.168.1.63:61616
State: RUNNABLE
Total blocked: 44 Total waited: 0
Stack trace:
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:129)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:49)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:56)
java.io.DataInputStream.readInt(DataInputStream.java:370)
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267)
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:156)
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:136)
java.lang.Thread.run(Thread.java:619)
====
Name: ActiveMQ Scheduler
State: TIMED_WAITING on
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@18b7298
Total blocked: 0 Total waited: 183
Stack trace:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1927)
java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:582)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:575)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:946)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906)
java.lang.Thread.run(Thread.java:619)
=========
Name: ActiveMQ Scheduler
State: TIMED_WAITING on
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@18b7298
Total blocked: 0 Total waited: 203
Stack trace:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1927)
java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:582)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:575)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:946)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906)
java.lang.Thread.run(Thread.java:619)
=========
Name: ActiveMQ Session Task
State: WAITING on org.apache.activemq.thread.PooledTaskRunner$1@d997f9
Total blocked: 0 Total waited: 5
Stack trace:
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:485)
org.apache.activemq.thread.PooledTaskRunner.shutdown(PooledTaskRunner.java:89)
org.apache.activemq.ActiveMQSessionExecutor.stop(ActiveMQSessionExecutor.java:117)
org.apache.activemq.ActiveMQSession.stop(ActiveMQSession.java:1468)
org.apache.activemq.ActiveMQConnection.stop(ActiveMQConnection.java:495)
XXX ... our onMessage handler that stops the connection ... XXX
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:870)
- locked java.lang.Object@17644c8
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:99)
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:166)
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:111)
org.apache.activemq.thread.PooledTaskRunner.access$100(PooledTaskRunner.java:26)
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:44)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
java.lang.Thread.run(Thread.java:619)
=========
Name: ActiveMQ Session Task
State: BLOCKED on java.lang.Object@17644c8 owned by: ActiveMQ Session
Task
Total blocked: 1 Total waited: 0
Stack trace:
org.apache.activemq.MessageDispatchChannel.dequeueNoWait(MessageDispatchChannel.java:93)
org.apache.activemq.ActiveMQMessageConsumer.iterate(ActiveMQMessageConsumer.java:928)
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:156)
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:111)
org.apache.activemq.thread.PooledTaskRunner.access$100(PooledTaskRunner.java:26)
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:44)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
java.lang.Thread.run(Thread.java:619)
=========
I haven't been able to reproduce this problem in a test case yet (seems
to depend very much on the timing between "start" messages), but maybe
someone can spot something using the stack traces?
Are we supposed to be able to stop the connection from inside a message
listener or is this pattern broken? If it's broken, how should one deal
with this kind of situation where you want to add a bunch of consumers
to an already-started connection but only want to enable them once
everyone is there (and once you've done some more setup)?
We're running the latest ActiveMQ 4.2 from trunk.
Thanks for reading.
Cheers,
Albert