You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by ravimbhatt <ra...@qubitdigital.com> on 2011/09/21 19:00:00 UTC

Stops Message Delivery to consumer: QueueConnection Stop Waits forever

[Posting again as my previous post did not get requiredd help]
Hi All, 

I am facing a very strange problem with ActiveMQ. 

I have a java class that attaches itself to a queue. This class holds a
threadExecutor and once it gets a message, it uses the threadexecutor to run
a task with just arrived message. 

It keeps track of how many threads are currently performing tasks, once it
reaches a MAX, say 4, it stops the connection so that it does not get more
messages from the queue. As and when any one of the tasks complete it starts
connection again. 

This setup works for few messages and then suddenly my class waits forever
in connection's stop method. it never returns!! 

Here is my onMessage method: 

public void onMessage(Message objectMessage) { 
    if (objectMessage instanceof ObjectMessage) { 
      try { 
        Serializable serializableObject = ((ObjectMessage) objectMessage) 
            .getObject(); 

        // we look for MyRequest object, we ignore 
        // everything else that was sent by mistake! 
        // TODO: check if its possible to reject a message delivery. reject
the 
        // ones we are not looking for. 
        if (serializableObject instanceof MyRequest) { 
          Log.getLogger().log(Level.INFO, 
              Thread.currentThread().getName() + " received an object"); 
          MyRequest myRequest = (MyRequest) serializableObject; 
          MyWorker myWorker = new MyWorker( 
              myRequest); 
          // make this object an observer of this categorisor 
          MyWorker.addObserver(this); 
          threadPoolExecutor.execute(myWorker); 
          // increment running thread count, its inside synchronized as some 
          // other thread may call update and try to decrement the count at
the 
          // same time when this thread is incrementing it. 
          Log.getLogger().log( 
              Level.INFO, 
              Thread.currentThread().getName() 
                  + " Entering synchronised block..."); 
          synchronized (this) { 
            runningThreads++; 

            Log.getLogger().log( 
                Level.INFO, 
                Thread.currentThread().getName() + " Running: " 
                    + runningThreads + " Max:" + MAX_THREADS); 

            // now check if we are running max allowed thread for this
machine. 
            if (runningThreads == MAX_THREADS) { 
              Log.getLogger() 
                  .log( 
                      Level.INFO, 
                      Thread.currentThread().getName() 
                          + " Reached max threads... stoping feedback
message consumer!!"); 
              // stop receiving more messages. 
              stopConsumption(); 

            } 
          } 

          Log.getLogger().log( 
              Level.INFO, 
              Thread.currentThread().getName() 
                  + " out of synchronised block...."); 

        } 
      } catch (JMSException e) { 
        e.printStackTrace(); 
      } 
    } 
  } 


Please note that this class adds itself as a listner to MyWorker object. 
  // make this object an observer of this categorisor 
   MyWorker.addObserver(this);

MyWorker once complete, updates this class as below: 
public void update(Observable observableSource, 
      MyStatus status) { 
    // TODO: work with status object later. 
    if (observableSource instanceof MyWorker) { 
      MyWorker myWorker = (MyWorker) observableSource; 
      // notify observers about categorization being over. 
      notifyObservers((MyResult) myWorker.getResult()); 

      Log.getLogger().log(Level.INFO, 
          Thread.currentThread().getName() + ": Notified observers..."); 

      synchronized (this) { 
        Log.getLogger() 
            .log( 
                Level.INFO, 
                Thread.currentThread().getName() 
                    + ": One of the threads comepleted... starting message
consumer if max was reached!!"); 
        runningThreads--; 
        Log.getLogger().log( 
            Level.INFO, 
            Thread.currentThread().getName() + " Running: " + runningThreads 
                + " Max:" + MAX_THREADS); 
        // start consuming only when we were at max. need not call 
        // startConsumption on each thread completion. 
        if (runningThreads == MAX_THREADS - 1) { 
          startConsumption(); 
        } 
      } 
    } 
  } 

Since there are multiple threads running and they all can call update
together, i have used synchronized block to decrement runningThreads and
call startConsumption() method. 

It works for random N messages and then it stops at the call to 
stopConsumption(); 

Here is my start and stop consumption methods. 

  /** 
   * stops listing to the queue 
   */ 
  private void stopConsumption() { 
    try { 
      conn.stop(); 
    } catch (JMSException e) { 
      e.printStackTrace(); 
    } 
  } 

  /** 
   * start listing to the queue again. 
   */ 
  private void startConsumption() { 
    try { 
      conn.start(); 
    } catch (JMSException e) { 
      e.printStackTrace(); 
    } 
  } 

Looking at jconsole threads, i can see that it waits in the call to
conn.start() and keeps on waiting forever. 

Here is how i connect to the queue in one my run() method: 

  @Override 
  public void run() { 
    // create a activeMQ Queue consumer and register this 
    // object as a JMS message listner. 
    // try connecting to ActiveMQ broker 
    try { 
      javax.naming.Context ctx = ActiveMQInitialContext.getInstance(); 
      // lookup the connection factory 
      javax.jms.QueueConnectionFactory factory =
(javax.jms.QueueConnectionFactory) ctx 
         
.lookup(PropertyManager.getProperty("JMS_CONNECTION_FACTORY_NAME")); 
      
      conn = factory.createQueueConnection(); 
      conn.start(); 
      // lookup an existing queue 
      javax.jms.Queue myqueue = (Queue) ctx.lookup(PropertyManager 
          .getProperty("JMS_REQUEST_QUEUE_NAME")); 
      // create a new queueSession for the client 
      QueueSession session = conn.createQueueSession(false, 
          QueueSession.AUTO_ACKNOWLEDGE); 

      // create a new subscriber to receive messages 
      MessageConsumer subscriber = session.createConsumer(myqueue); 
      subscriber.setMessageListener(this); 

      Log.getLogger().log( 
          Level.INFO, 
          Thread.currentThread().getName() + ": Attached to: " 
              + PropertyManager.getProperty("JMS_REQUEST_QUEUE_NAME")); 

    } catch (NamingException e) { 
      e.printStackTrace(); 
    } catch (JMSException e) { 
      e.printStackTrace(); 
    } 
  } 

As you can see i am using Auto ACK. I have tested this on activemq 5.4.2 and
5.5.0 and it behaves the same on both versions. I am using java 6.

Here is a thread dump...for the waiting thread. 


Name: ActiveMQ Session Task 
State: WAITING on org.apache.activemq.thread.PooledTaskRunner$1@35e59bda 
Total blocked: 3  Total waited: 4 

Stack trace: 
java.lang.Object.wait(Native Method) 
org.apache.activemq.thread.PooledTaskRunner.shutdown(PooledTaskRunner.java:95) 
org.apache.activemq.thread.PooledTaskRunner.shutdown(PooledTaskRunner.java:102) 
org.apache.activemq.ActiveMQSessionExecutor.stop(ActiveMQSessionExecutor.java:155) 
   - locked org.apache.activemq.ActiveMQSessionExecutor@55f6bd6a 
org.apache.activemq.ActiveMQSession.stop(ActiveMQSession.java:1653) 
org.apache.activemq.ActiveMQConnection.stop(ActiveMQConnection.java:550) 
   - locked java.util.concurrent.CopyOnWriteArrayList@561d0c91 
com.qubit.command.DefaultQueueListenerCommand.stopConsumption(DefaultQueueListenerCommand.java:239) 
com.qubit.command.DefaultQueueListenerCommand.onMessage(DefaultQueueListenerCommand.java:173) 
   - locked com.qubit.command.UrlCategorisationReqConsumerCmd@fed453 
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1229) 
   - locked java.lang.Object@37cd238a 
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:134) 
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:205) 
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122) 
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43) 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
java.lang.Thread.run(Thread.java:662) 


And here is one other thread thats blocked for the same object.. 


Name: Thread-23 
State: BLOCKED on com.qubit.command.UrlCategorisationReqConsumerCmd@fed453
owned by: ActiveMQ Session Task 
Total blocked: 1,961  Total waited: 84,130 

Stack trace: 
com.qubit.command.DefaultQueueListenerCommand.update(DefaultQueueListenerCommand.java:215) 
com.qubit.categoriser.URLCategoriser.notifyObservers(URLCategoriser.java:263) 
com.qubit.categoriser.URLCategoriser.categorise(URLCategoriser.java:194) 
com.qubit.categoriser.URLCategoriser.run(URLCategoriser.java:270) 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
java.lang.Thread.run(Thread.java:662)

--
View this message in context: http://activemq.2283324.n4.nabble.com/Stops-Message-Delivery-to-consumer-QueueConnection-Stop-Waits-forever-tp3830716p3830716.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.