You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by xabhi <xa...@gmail.com> on 2015/01/07 15:36:06 UTC

Consumer not able to consumer messages from queue

Hi,
I have a session aware message listener inside a DMLC with 5 concurrent
consumers consuming messages from a queue. This listener will send a message
everytime it receives a messages using the same session object to another
topic by creating a producer.

This setup runs fine for around 15 days and then suddenly the consumers stop
consuming messages from the queue and this continues until the broker is
restarted.

I think my issue may be related to this
http://activemq.2283324.n4.nabble.com/ActiveMQ-message-dequeuing-hangs-td4681366.html
but I am not sure.

Where should I start looking for the possible issues?

I am using spring v4.0.5 and ActiveMQ 5.10 and activemq's pooled connection
factory

Thanks,
Abhi



--
View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consumer-messages-from-queue-tp4689594.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Consumer not able to consumer messages from queue

Posted by xabhi <xa...@gmail.com>.
Hi,
A similar issue happened again in my ActiveMQ broker setup where the broker
didn't deliver any messages to a queue consumer for about an hour. I could
see some of the msgs being held on the broker and the count kept on
increasing during this time though other destinations were working fine and
messages were getting enqueued and dequeued. The message TTL was set for the
messages being set to 2 hrs on this queue.

I am not able explain this behavior in my setup. When does the broker holds
messages and what else should I be looking for? After 1 hour or so all the
messages were delivered and consumer again started receiving messages.

Please help!

Thanks,
Abhi



--
View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4690146.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Consumer not able to consumer messages from queue

Posted by Tim Bain <tb...@alumni.duke.edu>.
Mmm, I misunderstood what you'd written about where the bottleneck was
occurring.  Based on what you've said, it sounds like maybe your cursor is
getting full of messages for which no consumer exists (maybe because
they're in a session that was already closed?) and therefore the broker
can't pull out any of the messages matching the selectors of consumers that
are actually connected.  That's pure speculation on my part, but it might
be what's going on.  But it can only explain the problem if message TTL
wasn't being set (or was being set to a very large value, such that your
messages weren't expiring fast enough).  Can you browse the queue and
inspect the messages on it and confirm that the JMSExpiration header really
is set and that it's set to a value that's what you expect?  If it's set,
then my guess is probably wrong, but if it's not, then fix the code that's
trying to set it and see if this problem disappears...

On Tue, Jan 13, 2015 at 10:08 AM, xabhi <xa...@gmail.com> wrote:

> The producer is not blocked as the msgHeld value keeps increasing, its the
> consumer that stops consuming messages.
>
> >>Also, I was under the impression that it wasn't possible to set message
> TTL
> on the broker and that it could only be set by the producer;
>
> Yes, sorry about that it can be set by producer only, but you can do that
> by
> overriding send method of BrokerPluginSupport class to set message TTL when
> broker receives a message (which we do in cases where producer hasn't
> specified any message TTL value).
>
> Thanks,
> Abhi
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689858.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: Consumer not able to consumer messages from queue

Posted by xabhi <xa...@gmail.com>.
The producer is not blocked as the msgHeld value keeps increasing, its the
consumer that stops consuming messages.

>>Also, I was under the impression that it wasn't possible to set message
TTL 
on the broker and that it could only be set by the producer;

Yes, sorry about that it can be set by producer only, but you can do that by
overriding send method of BrokerPluginSupport class to set message TTL when
broker receives a message (which we do in cases where producer hasn't
specified any message TTL value).

Thanks,
Abhi



--
View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689858.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Consumer not able to consumer messages from queue

Posted by Tim Bain <tb...@alumni.duke.edu>.
I can't speak to the question about closing the session (I use Camel to
interact with ActiveMQ, so I don't directly deal with the session), but
even if that explains the increasing msgsHeld value, it doesn't explain the
producer slowing down and eventually stopping its sends.  So there might be
multiple problems here.

What code calls your publish() method periodically?  And can you find out
(e.g. by taking the thread dump Art referenced) whether you're blocked on
the send() call or somewhere else when it all comes to a halt?

Also, I was under the impression that it wasn't possible to set message TTL
on the broker and that it could only be set by the producer; can you
provide a reference to the documentation for the setting you're using?
On Jan 13, 2015 1:03 AM, "xabhi" <xa...@gmail.com> wrote:

> Thanks for the reply.
>
> I have disabled producer flow control on both topics and queues in my
> broker
> configuration and I have message TTL specified on broker side.
>
> The destination on which heartbeats are sent is a Queue on which 5
> concurrent consumers are listening and the reply to destination is a topic
> on which there are multiple subscribers.
>
> When all this went down the topic destination enqueue count was not
> increasing.
>
> I am pasting the code for consumer and producer:
>
> *Producer:*
>
> public boolean publish()
> {
>     String message = "Heartbeat message";
>     boolean responseReceived = false;
>
>     Connection connection = null;
>     Session session = null;
>
>     try
>     {
>       connection = myJmsTemplate.getConnectionFactory().createConnection();
>       session    = connection.createSession(transacted, ackMode);
>
>       String correlationId = null;
>       Long   timeStamp     = System.currentTimeMillis();
>       Random random        = new Random(timeStamp);
>
>       Integer randomPart = random.nextInt(Integer.MAX_VALUE);
>       Long    threadId   = Thread.currentThread().getId();
>       correlationId      = threadId + "_" + timeStamp + "_" + randomPart;
>
>       String messageSelector = "JMSCorrelationID='" + correlationId + "'";
>       MessageConsumer responseConsumer =
> session.createConsumer(receiveDestination, messageSelector);
>       connection.start();
>
>       // send a text message to broker
>       myJmsTemplate.send(sendDestination, new
> SimpleTextMessageCreator(message, receiveDestination, correlationId));
>
>       LOG.debug("Waiting for message with " + messageSelector + " for " +
> DEFAULT_TIMEOUT + " ms");
>
>       // check for response from broker, DEFAULT_TIMEOUT is 60s.
>      TextMessage responseMessage = (TextMessage)
> responseConsumer.receive(DEFAULT_TIMEOUT);
>      if (responseMessage != null)
>      {
>          if (!responseMessage.getJMSCorrelationID().equals(correlationId))
> {
>              String errorMsg =
>                  "Invalid correlation id in response message!!! " +
>                  "Expected : " + correlationId +
>                  " but received : " +
> responseMessage.getJMSCorrelationID();
>
>              LOG.error(errorMsg);
>              responseReceived = false;
>          }
>          else {
>              LOG.debug("Recieved the response back: " +
> responseMessage.getText());
>              LOG.debug("Correlation id of response message : " +
> responseMessage.getJMSCorrelationID());
>              responseReceived = true;
>          }
>      }
>     }
>     catch (JMSException e)
>     {
>      LOG.error("Error interacting with broker", e);
>     }
>     catch (Throwable t) {
>      LOG.warn("Publish encountered unknown exception.", t);
>     }
>     finally {
>      JmsUtil.closeConnection(connection, session,
> this.getClass().getName());
>     }
>     return responseReceived;
> }
>
>
> *Listener/Consumer:*
>
> public class HeartBeatListener implements SessionAwareMessageListener
> {
>
>      private final Log LOG = LogFactory.getLog(this.getClass());
>
>      @Override
>      public void onMessage(Message message, Session session) throws
> JMSException
>      {
>          if (!(message instanceof TextMessage)) {
>              throw new IllegalArgumentException("Message must be of type
> TextMessage: " + message);
>          }
>
>          String replyTextMessage = "Heartbeat Ack.";
>
>          try
>          {
>              TextMessage textMessage = (TextMessage) message;
>              String msg = textMessage.getText();
>
>              LOG.debug("Received heart beat message : " + msg);
>
>              // Send the response to the destination specified by the
>              // 'JMSReplyTo' field of the received message.
>              Destination responseDest = message.getJMSReplyTo();
>              if (responseDest != null)
>              {
>                  LOG.debug("Sending response to destination" +
> responseDest.toString());
>
>                  // Setup a message producer for the above destination
>                  MessageProducer producer =
> session.createProducer(responseDest);
>                  TextMessage responseMessage =
> session.createTextMessage(replyTextMessage);
>
>
> responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
>
>                  // Send the response back
>                  producer.send(responseMessage);
>                  LOG.debug("Heart Beat Response Sent: " + responseMessage);
>              }
>          }
>          catch (JMSException e)
>          {
>              LOG.error("Error while processing the message " + message, e);
>          }
>      }
> }
> This listener is used in a DMLC.
>
> Hi art,
> Here the session is created on the producer side (in the publish()
> function)
> and on listener side a session aware listener is used. My question was that
> what if the session on publisher side is closed before the session aware
> listener is even called or is in process of executing onMessage()?
>
> Please tell me if I am doing anything wrong here?
>
> Thanks for all the help.
> -Abhi
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689835.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: Consumer not able to consumer messages from queue

Posted by xabhi <xa...@gmail.com>.
Thanks for the reply.

I have disabled producer flow control on both topics and queues in my broker
configuration and I have message TTL specified on broker side.

The destination on which heartbeats are sent is a Queue on which 5
concurrent consumers are listening and the reply to destination is a topic
on which there are multiple subscribers.

When all this went down the topic destination enqueue count was not
increasing. 

I am pasting the code for consumer and producer:

*Producer:*

public boolean publish()
{
    String message = "Heartbeat message";
    boolean responseReceived = false;

    Connection connection = null;
    Session session = null;

    try
    {
      connection = myJmsTemplate.getConnectionFactory().createConnection();
      session    = connection.createSession(transacted, ackMode);

      String correlationId = null;
      Long   timeStamp     = System.currentTimeMillis();
      Random random        = new Random(timeStamp);

      Integer randomPart = random.nextInt(Integer.MAX_VALUE);
      Long    threadId   = Thread.currentThread().getId();
      correlationId      = threadId + "_" + timeStamp + "_" + randomPart;

      String messageSelector = "JMSCorrelationID='" + correlationId + "'";
      MessageConsumer responseConsumer =
session.createConsumer(receiveDestination, messageSelector);
      connection.start();

      // send a text message to broker
      myJmsTemplate.send(sendDestination, new
SimpleTextMessageCreator(message, receiveDestination, correlationId));

      LOG.debug("Waiting for message with " + messageSelector + " for " +
DEFAULT_TIMEOUT + " ms");

      // check for response from broker, DEFAULT_TIMEOUT is 60s.
     TextMessage responseMessage = (TextMessage)
responseConsumer.receive(DEFAULT_TIMEOUT);
     if (responseMessage != null)
     {
         if (!responseMessage.getJMSCorrelationID().equals(correlationId)) {
             String errorMsg =
                 "Invalid correlation id in response message!!! " +
                 "Expected : " + correlationId +
                 " but received : " + responseMessage.getJMSCorrelationID();

             LOG.error(errorMsg);
             responseReceived = false;
         }
         else {
             LOG.debug("Recieved the response back: " +
responseMessage.getText());
             LOG.debug("Correlation id of response message : " +
responseMessage.getJMSCorrelationID());
             responseReceived = true;
         }
     }
    }
    catch (JMSException e)
    {
     LOG.error("Error interacting with broker", e);
    }
    catch (Throwable t) {
     LOG.warn("Publish encountered unknown exception.", t);
    }
    finally {
     JmsUtil.closeConnection(connection, session,
this.getClass().getName());
    }
    return responseReceived;
}


*Listener/Consumer:*

public class HeartBeatListener implements SessionAwareMessageListener
{

     private final Log LOG = LogFactory.getLog(this.getClass());

     @Override
     public void onMessage(Message message, Session session) throws
JMSException
     {
         if (!(message instanceof TextMessage)) {
             throw new IllegalArgumentException("Message must be of type
TextMessage: " + message);
         }

         String replyTextMessage = "Heartbeat Ack.";

         try
         {
             TextMessage textMessage = (TextMessage) message;
             String msg = textMessage.getText();

             LOG.debug("Received heart beat message : " + msg);

             // Send the response to the destination specified by the
             // 'JMSReplyTo' field of the received message.
             Destination responseDest = message.getJMSReplyTo();
             if (responseDest != null)
             {
                 LOG.debug("Sending response to destination" +
responseDest.toString());

                 // Setup a message producer for the above destination
                 MessageProducer producer =
session.createProducer(responseDest);
                 TextMessage responseMessage =
session.createTextMessage(replyTextMessage);

                
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());

                 // Send the response back
                 producer.send(responseMessage);
                 LOG.debug("Heart Beat Response Sent: " + responseMessage);
             }
         }
         catch (JMSException e)
         {
             LOG.error("Error while processing the message " + message, e);
         }
     }
}
This listener is used in a DMLC.

Hi art,
Here the session is created on the producer side (in the publish() function)
and on listener side a session aware listener is used. My question was that
what if the session on publisher side is closed before the session aware
listener is even called or is in process of executing onMessage()?

Please tell me if I am doing anything wrong here?

Thanks for all the help.
-Abhi




--
View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689835.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Consumer not able to consumer messages from queue

Posted by xabhi <xa...@gmail.com>.
Hi Art,
Can you also please take a look at 
http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689835.html
<http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689835.html>  

Thanks,
Abhi



--
View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689943.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Consumer not able to consumer messages from queue

Posted by artnaseef <ar...@artnaseef.com>.
Good point - when using this type of selector scenario, using message TTLs or
some other means of making sure messages are consumed is important.

As mentioned earlier, grabbing a stack trace may be telling, although it may
be a little more difficult to understand since the DMLC is involved.

Impacts of closing the session that was created by the DMLC is a good
question as well.  Unless there is some documentation from the Spring DMLC
that shows it is safe to do, or the code has been read to ensure so, I would
avoid doing so.  Also, closing the session seems odd to me -- if there is a
problem with the connection, closing the connection would be necessary and
closing the session would be unlikely to fix it.  If there is a problem
within the single session, that is odd.

One thing to note - having producers and consumers sharing the same
connection can lead to a deadlock when producer-flow-control kicks in,
depending on the messaging pattern of the clients involved.



--
View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689811.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Consumer not able to consumer messages from queue

Posted by Tim Bain <tb...@alumni.duke.edu>.
You say you're doing request-response with a 60-second timeout on the
response.  If some of those responses took longer than 60 seconds, I'd
expect that that particular response message wouldn't be removed from the
response queue, and after some number of them I'd expect that the response
queue would be full of messages that were never going to be consumed and
producer flow control would kick in for the response queue.  Depending on
how you've implemented your producer, this might or might not prevent it
from publishing any more requests.  At a minimum, you should make sure
you've thought about what should happen to messages that are not consumed
due to the 60-second timeout, even if this doesn't turn out to be the root
cause for your problem...

Do you have any messages in the broker logs about producer flow control?
And do you have any way to find out how many messages were in the response
queue at the time this all went down?

Also, are you sure you're closing all of your consumers properly?
Selectors are expensive for a broker to evaluate, so if it's still
evaluating whether to deliver each message to every single consumer that's
ever connected, that might explain the slowdown you see (though it doesn't
explain why it completely stops processing data, so this may not be it).

On Mon, Jan 12, 2015 at 4:03 AM, xabhi <xa...@gmail.com> wrote:

> Please help i am not able to get anywhere with this issue.
>
> Atleast tell me what more information to collect and what things to look
> out
> for next time this issue happens which can help to resolve/debug this
> issue.
>
> Thanks,
> Abhi
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689783.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: Consumer not able to consumer messages from queue

Posted by xabhi <xa...@gmail.com>.
Please help i am not able to get anywhere with this issue.

Atleast tell me what more information to collect and what things to look out
for next time this issue happens which can help to resolve/debug this issue.

Thanks,
Abhi



--
View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689783.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Consumer not able to consumer messages from queue

Posted by xabhi <xa...@gmail.com>.
Thanks for the reply. I don't have the stack trace currently with me and I
have not been able to reproduce this issue.
In my broker I log hourly destination stats and for this particular topic I
observed that the messages consumption doesn't stop immediately rather it
happens overtime.

[destName: queue://app.heartBeat | enqueueCount: 160014 | dequeueCount:
160014 | dispatchCount: 160014 | expiredCount: 0 | inflightCount: 0 |
msgsHeld: 0 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 |
memoryLimit: 134217728 | avgEnqueueTimeMs: 23540.421475620882 |
maxEnqueueTimeMs: 3159190 | minEnqueueTimeMs: 1 | currentConsumers: 10 |
currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 |
minMsgSize: 1033 | maxMsgSize: 1074 | avgMsgSize: 1057.1182958991087 |
totalMsgSize: 169153727] – all messages consumed till now.

[destName: queue://app.heartBeat | enqueueCount: 160408 | dequeueCount:
160274 | dispatchCount: 160274 | expiredCount: 0 | inflightCount: 0 |
msgsHeld: 134 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 |
memoryLimit: 134217728 | avgEnqueueTimeMs: 23502.23844790796 |
maxEnqueueTimeMs: 3159190 | minEnqueueTimeMs: 1 | currentConsumers: 10 |
currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 |
minMsgSize: 1033 | maxMsgSize: 1074 | avgMsgSize: 1057.1288090369558 |
totalMsgSize: 169571918] – messages consumed 260/394, held - 134

[destName: queue://app.heartBeat | enqueueCount: 160697 | dequeueCount:
160462 | dispatchCount: 160462 | expiredCount: 0 | inflightCount: 0 |
msgsHeld: 235 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 |
memoryLimit: 134217728 | avgEnqueueTimeMs: 24854.09577345415 |
maxEnqueueTimeMs: 3159190 | minEnqueueTimeMs: 1 | currentConsumers: 10 |
currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 |
minMsgSize: 1033 | maxMsgSize: 1074 | avgMsgSize: 1057.1415956738458 |
totalMsgSize: 169879483] – messages consumed 188/289

[destName: queue://app.heartBeat | enqueueCount: 160979 | dequeueCount:
160462 | dispatchCount: 160462 | expiredCount: 0 | inflightCount: 0 |
msgsHeld: 517 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 |
memoryLimit: 134217728 | avgEnqueueTimeMs: 24854.09577345415 |
maxEnqueueTimeMs: 3159190 | minEnqueueTimeMs: 1 | currentConsumers: 10 |
currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 |
minMsgSize: 1033 | maxMsgSize: 1074 | avgMsgSize: 1057.1541691773461 |
totalMsgSize: 170179621] – messages consumed 0/282

After this no messages are consumed. Another thing is that the usual hourly
heartbeat message rate is 492 messages/hour but it gradually decreases
during this time to 282.

In my producer, I create a consumer and wait for the acknowledgement using
consumer.receive() call with 60s timeout and at listener side I send back
the acknowledgement for heartbeat by creating a producer using the same
underlying session that I get in session aware listener. I am using
selectors for this kind heartbeat messaging and acknowledgement (with
persistent messages).

I have one question, The producer closes the session on its side under which
it sent the heartbeat message once it fails to receive an acknowledgement
(may be due to timeout), what will be the behavior of session aware listener
on consumer side (which will use same session to send back the ack)?

Thanks,
Abhi


Does this behavior points in some direction I should look for?



--
View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consumer-messages-from-queue-tp4689594p4689632.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Consumer not able to consumer messages from queue

Posted by artnaseef <ar...@artnaseef.com>.
I would look at a stack trace inside the application - try to find the
consumer threads and see if they are active, and if-so, what they are doing. 
If all is normal, they should be waiting to receive a message from the
ActiveMQ transport.

Also, check the broker logs for messages.  Anything related to producer flow
control helps.

Does a restart of the consumer application correct the problem, or is a
broker restart absolutely necessary?



--
View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consumer-messages-from-queue-tp4689594p4689621.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.