You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by ravimbhatt <ra...@qubitdigital.com> on 2011/06/02 20:18:39 UTC

QueueConnection Stop Waits forever: Stops Message Delivery to consumer

Hi All,

I am facing a very strange problem with ActiveMQ. 

I have a java class that attaches itself to a queue. This class holds a
threadExecutor and once it gets a message, it uses the threadexecutor to run
a task with just arrived message. 

It keeps track of how many threads are currently performing tasks, once it
reaches a MAX, say 4, it stops the connection so that it does not get more
messages from the queue. As and when any one of the tasks complete it starts
connection again. 

This setup works for few messages and then suddenly my class waits forever
in connection's stop method. it never returns!!

Here is my onMessage method:

public void onMessage(Message objectMessage) {
    if (objectMessage instanceof ObjectMessage) {
      try {
        Serializable serializableObject = ((ObjectMessage) objectMessage)
            .getObject();

        // we look for MyRequest object, we ignore
        // everything else that was sent by mistake!
        // TODO: check if its possible to reject a message delivery. reject
the
        // ones we are not looking for.
        if (serializableObject instanceof MyRequest) {
          Log.getLogger().log(Level.INFO,
              Thread.currentThread().getName() + " received an object");
          MyRequest myRequest = (MyRequest) serializableObject;
          MyWorker myWorker = new MyWorker(
              myRequest);
          // make this object an observer of this categorisor
          MyWorker.addObserver(this);
          threadPoolExecutor.execute(myWorker);
          // increment running thread count, its inside synchronized as some
          // other thread may call update and try to decrement the count at
the
          // same time when this thread is incrementing it.
          Log.getLogger().log(
              Level.INFO,
              Thread.currentThread().getName()
                  + " Entering synchronised block...");
          synchronized (this) {
            runningThreads++;

            Log.getLogger().log(
                Level.INFO,
                Thread.currentThread().getName() + " Running: "
                    + runningThreads + " Max:" + MAX_THREADS);

            // now check if we are running max allowed thread for this
machine.
            if (runningThreads == MAX_THREADS) {
              Log.getLogger()
                  .log(
                      Level.INFO,
                      Thread.currentThread().getName()
                          + " Reached max threads... stoping feedback
message consumer!!");
              // stop receiving more messages.
              stopConsumption();

            }
          }

          Log.getLogger().log(
              Level.INFO,
              Thread.currentThread().getName()
                  + " out of synchronised block....");

        }
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }


Please note that this class adds itself as a listner to MyWorker object.
  // make this object an observer of this categorisor
   MyWorker.addObserver(this);

MyWorker once complete, updates this class as below: 
public void update(Observable observableSource,
      MyStatus status) {
    // TODO: work with status object later.
    if (observableSource instanceof MyWorker) {
      MyWorker myWorker = (MyWorker) observableSource;
      // notify observers about categorization being over.
      notifyObservers((MyResult) myWorker.getResult());

      Log.getLogger().log(Level.INFO,
          Thread.currentThread().getName() + ": Notified observers...");

      synchronized (this) {
        Log.getLogger()
            .log(
                Level.INFO,
                Thread.currentThread().getName()
                    + ": One of the threads comepleted... starting message
consumer if max was reached!!");
        runningThreads--;
        Log.getLogger().log(
            Level.INFO,
            Thread.currentThread().getName() + " Running: " + runningThreads
                + " Max:" + MAX_THREADS);
        // start consuming only when we were at max. need not call
        // startConsumption on each thread completion.
        if (runningThreads == MAX_THREADS - 1) {
          startConsumption();
        }
      }
    }
  }

Since there are multiple threads running and they all can call update
together, i have used synchronized block to decrement runningThreads and
call startConsumption() method. 

It works for random N messages and then it stops at the call to 
stopConsumption();

Here is my start and stop consumption methods. 

  /**
   * stops listing to the queue
   */
  private void stopConsumption() {
    try {
      conn.stop();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

  /**
   * start listing to the queue again.
   */
  private void startConsumption() {
    try {
      conn.start();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

Looking at jconsole threads, i can see that it waits in the call to
conn.start() and keeps on waiting forever. 

Here is how i connect to the queue in one my run() method:

  @Override
  public void run() {
    // create a activeMQ Queue consumer and register this
    // object as a JMS message listner.
    // try connecting to ActiveMQ broker
    try {
      javax.naming.Context ctx = ActiveMQInitialContext.getInstance();
      // lookup the connection factory
      javax.jms.QueueConnectionFactory factory =
(javax.jms.QueueConnectionFactory) ctx
         
.lookup(PropertyManager.getProperty("JMS_CONNECTION_FACTORY_NAME"));
     
      conn = factory.createQueueConnection();
      conn.start();
      // lookup an existing queue
      javax.jms.Queue myqueue = (Queue) ctx.lookup(PropertyManager
          .getProperty("JMS_REQUEST_QUEUE_NAME"));
      // create a new queueSession for the client
      QueueSession session = conn.createQueueSession(false,
          QueueSession.AUTO_ACKNOWLEDGE);

      // create a new subscriber to receive messages
      MessageConsumer subscriber = session.createConsumer(myqueue);
      subscriber.setMessageListener(this);

      Log.getLogger().log(
          Level.INFO,
          Thread.currentThread().getName() + ": Attached to: "
              + PropertyManager.getProperty("JMS_REQUEST_QUEUE_NAME"));

    } catch (NamingException e) {
      e.printStackTrace();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

As you can see i am using Auto ACK. I have tested this on activemq 5.4.2 and
5.5.0 and it behaves the same on both versions. I am using java 6.


--
View this message in context: http://activemq.2283324.n4.nabble.com/QueueConnection-Stop-Waits-forever-Stops-Message-Delivery-to-consumer-tp3569019p3569019.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: QueueConnection Stop Waits forever: Stops Message Delivery to consumer

Posted by ravimbhatt <ra...@qubitdigital.com>.
Hi, 

I was on vacation fro a while. 

Here is a thread dump...for the waiting thread. 


Name: ActiveMQ Session Task 
State: WAITING on org.apache.activemq.thread.PooledTaskRunner$1@35e59bda 
Total blocked: 3  Total waited: 4 

Stack trace: 
java.lang.Object.wait(Native Method) 
org.apache.activemq.thread.PooledTaskRunner.shutdown(PooledTaskRunner.java:95) 
org.apache.activemq.thread.PooledTaskRunner.shutdown(PooledTaskRunner.java:102) 
org.apache.activemq.ActiveMQSessionExecutor.stop(ActiveMQSessionExecutor.java:155) 
   - locked org.apache.activemq.ActiveMQSessionExecutor@55f6bd6a 
org.apache.activemq.ActiveMQSession.stop(ActiveMQSession.java:1653) 
org.apache.activemq.ActiveMQConnection.stop(ActiveMQConnection.java:550) 
   - locked java.util.concurrent.CopyOnWriteArrayList@561d0c91 
com.qubit.command.DefaultQueueListenerCommand.stopConsumption(DefaultQueueListenerCommand.java:239) 
com.qubit.command.DefaultQueueListenerCommand.onMessage(DefaultQueueListenerCommand.java:173) 
   - locked com.qubit.command.UrlCategorisationReqConsumerCmd@fed453 
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1229) 
   - locked java.lang.Object@37cd238a 
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:134) 
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:205) 
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122) 
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43) 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
java.lang.Thread.run(Thread.java:662) 


And here is one other thread thats blocked for the same object.. 


Name: Thread-23 
State: BLOCKED on com.qubit.command.UrlCategorisationReqConsumerCmd@fed453
owned by: ActiveMQ Session Task 
Total blocked: 1,961  Total waited: 84,130 

Stack trace: 
com.qubit.command.DefaultQueueListenerCommand.update(DefaultQueueListenerCommand.java:215) 
com.qubit.categoriser.URLCategoriser.notifyObservers(URLCategoriser.java:263) 
com.qubit.categoriser.URLCategoriser.categorise(URLCategoriser.java:194) 
com.qubit.categoriser.URLCategoriser.run(URLCategoriser.java:270) 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
java.lang.Thread.run(Thread.java:662) 

--
View this message in context: http://activemq.2283324.n4.nabble.com/QueueConnection-Stop-Waits-forever-Stops-Message-Delivery-to-consumer-tp3569019p3683430.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: QueueConnection Stop Waits forever: Stops Message Delivery to consumer

Posted by ravimbhatt <ra...@qubitdigital.com>.
Hi,

I was on vacation fro a while. 

Here is a thread dump...for the waiting thread. 


Name: ActiveMQ Session Task
State: WAITING on org.apache.activemq.thread.PooledTaskRunner$1@35e59bda
Total blocked: 3  Total waited: 4

Stack trace: 
java.lang.Object.wait(Native Method)
org.apache.activemq.thread.PooledTaskRunner.shutdown(PooledTaskRunner.java:95)
org.apache.activemq.thread.PooledTaskRunner.shutdown(PooledTaskRunner.java:102)
org.apache.activemq.ActiveMQSessionExecutor.stop(ActiveMQSessionExecutor.java:155)
   - locked org.apache.activemq.ActiveMQSessionExecutor@55f6bd6a
org.apache.activemq.ActiveMQSession.stop(ActiveMQSession.java:1653)
org.apache.activemq.ActiveMQConnection.stop(ActiveMQConnection.java:550)
   - locked java.util.concurrent.CopyOnWriteArrayList@561d0c91
com.qubit.command.DefaultQueueListenerCommand.stopConsumption(DefaultQueueListenerCommand.java:239)
com.qubit.command.DefaultQueueListenerCommand.onMessage(DefaultQueueListenerCommand.java:173)
   - locked com.qubit.command.UrlCategorisationReqConsumerCmd@fed453
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1229)
   - locked java.lang.Object@37cd238a
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:134)
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:205)
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
java.lang.Thread.run(Thread.java:662)


And here is one other thread thats blocked for the same object..


Name: Thread-23
State: BLOCKED on com.qubit.command.UrlCategorisationReqConsumerCmd@fed453
owned by: ActiveMQ Session Task
Total blocked: 1,961  Total waited: 84,130

Stack trace: 
com.qubit.command.DefaultQueueListenerCommand.update(DefaultQueueListenerCommand.java:215)
com.qubit.categoriser.URLCategoriser.notifyObservers(URLCategoriser.java:263)
com.qubit.categoriser.URLCategoriser.categorise(URLCategoriser.java:194)
com.qubit.categoriser.URLCategoriser.run(URLCategoriser.java:270)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
java.lang.Thread.run(Thread.java:662)


--
View this message in context: http://activemq.2283324.n4.nabble.com/QueueConnection-Stop-Waits-forever-Stops-Message-Delivery-to-consumer-tp3569019p3683421.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: QueueConnection Stop Waits forever: Stops Message Delivery to consumer

Posted by Gary Tully <ga...@gmail.com>.
If you post a thread dump when you are blocked on the stop call, from
something like kill -3 or jstack we may be able to spot the problem

On 2 June 2011 19:18, ravimbhatt <ra...@qubitdigital.com> wrote:
> Hi All,
>
> I am facing a very strange problem with ActiveMQ.
>
> I have a java class that attaches itself to a queue. This class holds a
> threadExecutor and once it gets a message, it uses the threadexecutor to run
> a task with just arrived message.
>
> It keeps track of how many threads are currently performing tasks, once it
> reaches a MAX, say 4, it stops the connection so that it does not get more
> messages from the queue. As and when any one of the tasks complete it starts
> connection again.
>
> This setup works for few messages and then suddenly my class waits forever
> in connection's stop method. it never returns!!
>
> Here is my onMessage method:
>
> public void onMessage(Message objectMessage) {
>    if (objectMessage instanceof ObjectMessage) {
>      try {
>        Serializable serializableObject = ((ObjectMessage) objectMessage)
>            .getObject();
>
>        // we look for MyRequest object, we ignore
>        // everything else that was sent by mistake!
>        // TODO: check if its possible to reject a message delivery. reject
> the
>        // ones we are not looking for.
>        if (serializableObject instanceof MyRequest) {
>          Log.getLogger().log(Level.INFO,
>              Thread.currentThread().getName() + " received an object");
>          MyRequest myRequest = (MyRequest) serializableObject;
>          MyWorker myWorker = new MyWorker(
>              myRequest);
>          // make this object an observer of this categorisor
>          MyWorker.addObserver(this);
>          threadPoolExecutor.execute(myWorker);
>          // increment running thread count, its inside synchronized as some
>          // other thread may call update and try to decrement the count at
> the
>          // same time when this thread is incrementing it.
>          Log.getLogger().log(
>              Level.INFO,
>              Thread.currentThread().getName()
>                  + " Entering synchronised block...");
>          synchronized (this) {
>            runningThreads++;
>
>            Log.getLogger().log(
>                Level.INFO,
>                Thread.currentThread().getName() + " Running: "
>                    + runningThreads + " Max:" + MAX_THREADS);
>
>            // now check if we are running max allowed thread for this
> machine.
>            if (runningThreads == MAX_THREADS) {
>              Log.getLogger()
>                  .log(
>                      Level.INFO,
>                      Thread.currentThread().getName()
>                          + " Reached max threads... stoping feedback
> message consumer!!");
>              // stop receiving more messages.
>              stopConsumption();
>
>            }
>          }
>
>          Log.getLogger().log(
>              Level.INFO,
>              Thread.currentThread().getName()
>                  + " out of synchronised block....");
>
>        }
>      } catch (JMSException e) {
>        e.printStackTrace();
>      }
>    }
>  }
>
>
> Please note that this class adds itself as a listner to MyWorker object.
>  // make this object an observer of this categorisor
>   MyWorker.addObserver(this);
>
> MyWorker once complete, updates this class as below:
> public void update(Observable observableSource,
>      MyStatus status) {
>    // TODO: work with status object later.
>    if (observableSource instanceof MyWorker) {
>      MyWorker myWorker = (MyWorker) observableSource;
>      // notify observers about categorization being over.
>      notifyObservers((MyResult) myWorker.getResult());
>
>      Log.getLogger().log(Level.INFO,
>          Thread.currentThread().getName() + ": Notified observers...");
>
>      synchronized (this) {
>        Log.getLogger()
>            .log(
>                Level.INFO,
>                Thread.currentThread().getName()
>                    + ": One of the threads comepleted... starting message
> consumer if max was reached!!");
>        runningThreads--;
>        Log.getLogger().log(
>            Level.INFO,
>            Thread.currentThread().getName() + " Running: " + runningThreads
>                + " Max:" + MAX_THREADS);
>        // start consuming only when we were at max. need not call
>        // startConsumption on each thread completion.
>        if (runningThreads == MAX_THREADS - 1) {
>          startConsumption();
>        }
>      }
>    }
>  }
>
> Since there are multiple threads running and they all can call update
> together, i have used synchronized block to decrement runningThreads and
> call startConsumption() method.
>
> It works for random N messages and then it stops at the call to
> stopConsumption();
>
> Here is my start and stop consumption methods.
>
>  /**
>   * stops listing to the queue
>   */
>  private void stopConsumption() {
>    try {
>      conn.stop();
>    } catch (JMSException e) {
>      e.printStackTrace();
>    }
>  }
>
>  /**
>   * start listing to the queue again.
>   */
>  private void startConsumption() {
>    try {
>      conn.start();
>    } catch (JMSException e) {
>      e.printStackTrace();
>    }
>  }
>
> Looking at jconsole threads, i can see that it waits in the call to
> conn.start() and keeps on waiting forever.
>
> Here is how i connect to the queue in one my run() method:
>
>  @Override
>  public void run() {
>    // create a activeMQ Queue consumer and register this
>    // object as a JMS message listner.
>    // try connecting to ActiveMQ broker
>    try {
>      javax.naming.Context ctx = ActiveMQInitialContext.getInstance();
>      // lookup the connection factory
>      javax.jms.QueueConnectionFactory factory =
> (javax.jms.QueueConnectionFactory) ctx
>
> .lookup(PropertyManager.getProperty("JMS_CONNECTION_FACTORY_NAME"));
>
>      conn = factory.createQueueConnection();
>      conn.start();
>      // lookup an existing queue
>      javax.jms.Queue myqueue = (Queue) ctx.lookup(PropertyManager
>          .getProperty("JMS_REQUEST_QUEUE_NAME"));
>      // create a new queueSession for the client
>      QueueSession session = conn.createQueueSession(false,
>          QueueSession.AUTO_ACKNOWLEDGE);
>
>      // create a new subscriber to receive messages
>      MessageConsumer subscriber = session.createConsumer(myqueue);
>      subscriber.setMessageListener(this);
>
>      Log.getLogger().log(
>          Level.INFO,
>          Thread.currentThread().getName() + ": Attached to: "
>              + PropertyManager.getProperty("JMS_REQUEST_QUEUE_NAME"));
>
>    } catch (NamingException e) {
>      e.printStackTrace();
>    } catch (JMSException e) {
>      e.printStackTrace();
>    }
>  }
>
> As you can see i am using Auto ACK. I have tested this on activemq 5.4.2 and
> 5.5.0 and it behaves the same on both versions. I am using java 6.
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/QueueConnection-Stop-Waits-forever-Stops-Message-Delivery-to-consumer-tp3569019p3569019.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://fusesource.com
http://blog.garytully.com