You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Albert Strasheim <13...@sun.ac.za> on 2007/01/31 13:02:25 UTC

Client getting stuck when stopping connection from message listener

Hello all

We've run into a possible issue when stopping a connection from a 
message listener. Our application listens for a "start" message, upon 
which it does the following in a message listener:

1. It stops the connection
2. It creates another session from the connection
3. It constructs one of our objects, passing it the session
4. The object uses the session to create a few more consumers
5. When the constructor returns, the connection is started again

Our object looks something like this:

public final class MultipleListener {
  private int state;
  private class Listener1 implements MessageListener {
    public void onMessage(Message message) {
      state += 1;
    }
  }
  private class Listener2 implements MessageListener {
    public void onMessage(Message message) {
      state += 2;
    }
  }
  public MultipleListener(Session session, Destination destination) throws JMSException {
    MessageConsumer consumer1 = session.createConsumer(destination);
    consumer1.setMessageListener(new Listener1());
    MessageConsumer consumer2 = session.createConsumer(destination);
    consumer2.setMessageListener(new Listener2());
  }
}

Just to check that I read my JMS spec correctly: since both consumers 
are created from the same session, we don't have to synchronize access 
to MultipleListener's state since only one thread from the messaging
system will call into the message listeners at any given time. Is this 
right?

And we're doing something like this to create MultipleListeners:

Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(...);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new StartListener(connection));
connection.start();

With StartListener something like:

class StartListener implements MessageListener {
    private Connection connection;
    public StartListener(Connection connection) {
        this.connection = connection;
    }
    public void onMessage(Message message) {
        connection.stop();
        new MultipleListener(connection.createSession(...));
        connection.start();
    }
}

The reason I'm stopping the connection prior to making the 
MultipleListener is that I don't want the message listeners to be 
called until the constructor is done. The thinking here is that all 
MultipleListener's internal state might not be ready to receive messages 
until the constructor has returned.

Is this a valid concern? Is there a way to deal with this without 
stopping the connection?

We'd like each MultipleListener to have its own session so that they 
can process messages in parallel, so plugging them in on the same 
session as StartListener isn't really what we want to do.

We could make this single session work by putting in a queue and having 
a thread we create pass messages on to the MultipleListeners, but I'm 
hoping we can get away using only ActiveMQ's Session tasks (or is this 
an abuse of the session threads?). If we shouldn't be using the session 
threads to drive our components, is there a nice example somewhere of 
a good alternative (probably some kind of thread pool)?

With all that said, we're seeing some problems when the message listener 
stops the connection.

The threads end up looking like this:

Name: ActiveMQ Transport: tcp://localhost/192.168.1.63:61616
State: RUNNABLE
Total blocked: 44  Total waited: 0

Stack trace: 
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:129)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:49)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:56)
java.io.DataInputStream.readInt(DataInputStream.java:370)
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267)
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:156)
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:136)
java.lang.Thread.run(Thread.java:619)

====

Name: ActiveMQ Scheduler
State: TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@18b7298
Total blocked: 0  Total waited: 183

Stack trace: 
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1927)
java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:582)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:575)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:946)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906)
java.lang.Thread.run(Thread.java:619)

=========

Name: ActiveMQ Scheduler
State: TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@18b7298
Total blocked: 0  Total waited: 203

Stack trace: 
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1927)
java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:582)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:575)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:946)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906)
java.lang.Thread.run(Thread.java:619)

=========

Name: ActiveMQ Session Task
State: WAITING on org.apache.activemq.thread.PooledTaskRunner$1@d997f9
Total blocked: 0  Total waited: 5

Stack trace: 
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:485)
org.apache.activemq.thread.PooledTaskRunner.shutdown(PooledTaskRunner.java:89)
org.apache.activemq.ActiveMQSessionExecutor.stop(ActiveMQSessionExecutor.java:117)
org.apache.activemq.ActiveMQSession.stop(ActiveMQSession.java:1468)
org.apache.activemq.ActiveMQConnection.stop(ActiveMQConnection.java:495)

XXX ... our onMessage handler that stops the connection ... XXX

org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:870)
   - locked java.lang.Object@17644c8
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:99)
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:166)
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:111)
org.apache.activemq.thread.PooledTaskRunner.access$100(PooledTaskRunner.java:26)
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:44)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
java.lang.Thread.run(Thread.java:619)

=========

Name: ActiveMQ Session Task
State: BLOCKED on java.lang.Object@17644c8 owned by: ActiveMQ Session 
Task
Total blocked: 1  Total waited: 0

Stack trace: 
org.apache.activemq.MessageDispatchChannel.dequeueNoWait(MessageDispatchChannel.java:93)
org.apache.activemq.ActiveMQMessageConsumer.iterate(ActiveMQMessageConsumer.java:928)
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:156)
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:111)
org.apache.activemq.thread.PooledTaskRunner.access$100(PooledTaskRunner.java:26)
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:44)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
java.lang.Thread.run(Thread.java:619)

=========

I haven't been able to reproduce this problem in a test case yet (seems 
to depend very much on the timing between "start" messages), but maybe 
someone can spot something using the stack traces?

Are we supposed to be able to stop the connection from inside a message 
listener or is this pattern broken? If it's broken, how should one deal 
with this kind of situation where you want to add a bunch of consumers 
to an already-started connection but only want to enable them once 
everyone is there (and once you've done some more setup)?

We're running the latest ActiveMQ 4.2 from trunk.

Thanks for reading.

Cheers,

Albert