You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Jose Martinez <jm...@opencrowd.com> on 2012/12/24 17:50:24 UTC

Embedded Broker/Listener becomes inactive and doesn't dequeue messages

Hello,

I've implemented an embedded activemq service and a message listener using
in activemq 5.7.0. It works fine for a few hours but I've noticed that
after a day or two of inactivity messages stop getting consumed and I keep
getting the following message in the logs:

16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
elapsed since last write check.
16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
WriteCheck[tcp://my.ip.addr:61616]
16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
elapsed since last write check.
16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
WriteCheck[tcp://my.ip.addr:61616]
16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
elapsed since last write check.
16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
WriteCheck[tcp://my.ip.addr:61616]


It looks like my connection becomes inactive after awhile and messages get
stuck in the queue. I can see in netstat the several ports
remain occupied but messages are not dequeued.

Here's what my embedded service looks like (implemented as a servlet in
tomcat):

public void init(ServletConfig config) throws ServletException {
super.init(config);
 logger.info("Initializing message queue listener");
 try {
BrokerService broker = new BrokerService();

TransportConnector connector = new TransportConnector();
connector.setUri(new URI("tcp://localhost:61616"));
broker.addConnector(connector);
broker.setPersistent(false);
broker.getSystemUsage().getTempUsage().setLimit(2000);
broker.start();
EntityXMLConsumer.startInstance(); /this is the listener
} catch (Exception e) {
logger.error(e);
}
}


And here's my listener.

public class EntityXMLConsumer implements MessageListener,
ExceptionListener{
 private static Logger logger = Logger.getLogger(EntityXMLConsumer.class);
 private static ConfigManager configManager = ConfigManager.getInstance();
private static String url =
configManager.getProperty("popworkflowsvc.activemq.url");
//private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String subject = "WORKFLOW";
    private static Session session;
    private MessageConsumer consumer;
    private static Connection connection;
    private static EntityXMLConsumer instance = null;

private EntityXMLConsumer() throws JMSException{
logger.info("Initializing message listener on url :" + url);
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
                connection = connectionFactory.createConnection();
               // to use transactions you should set the first parameter to
'true'
               session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
               Destination destination = session.createQueue(subject);
               consumer = session.createConsumer(destination);

        consumer.setMessageListener(this);
            connection.start();
}
 public static EntityXMLConsumer startInstance() throws JMSException {
 if (instance == null){
instance = new EntityXMLConsumer();
}
 return instance;
}
 public static void closeConnection() {
        try {
         logger.info("closing connection");
connection.close();
} catch (JMSException e) {
logger.error(e);
}
}


public void onException(JMSException ex) {
logger.error(ex);
closeConnection();
}


public void onMessage(Message message) {
        final String MSG_ID = "ENTITY_DATA";
        PopWorkflowServiceSEI service = new PopWorkflowServiceImpl();

    try {
logger.info("message received: " + message );
if (message instanceof MapMessage){
MapMessage mapMessage = (MapMessage) message;
Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
for (Object messageKey: payloadMap.keySet()){
logger.info("Module ID: " + messageKey);
Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
String[].class);
service.initWorkflow(strEntityXML , (String)messageKey);
logger.debug("Entity XML " + payloadMap.get(messageKey));
}
}
     } catch (JMSException e) {
     logger.error(e);
     closeConnection();
}
}
}


So it looks like either the listener or the broker stop responding and
messages get stuck in the queue. Are there any additional parameters I need
to configure? Any pointers appreciated.


Thanks for your help.

Re: Embedded Broker/Listener becomes inactive and doesn't dequeue messages

Posted by Jose Martinez <jm...@opencrowd.com>.
Thanks. I'll have to look into how to get the JConsole screenshot as I've
never done it before. Do you think there might be something wrong with the
my listener? I suspect the broker is working fine but the listener is the
one that is dying. I'll try to debug with the JConsole. Thanks!


On Tue, Dec 25, 2012 at 2:49 PM, Raul Kripalani <ra...@evosent.com> wrote:

> Can you attach a JConsole screenshot of the Broker MBean attributes?
>
> Does this happen with just one destination or with all?
>
> The Inactivity Monitor WriteChecker is doing exactly what you pursue:
> keeping the connection alive by sending pings back and forth.
>
> It doesn't report errors, so it looks like the connections are okay. The
> Broker JMX screenshot may tell us what's going on.
>
> Raúl.
> On 25 Dec 2012 19:33, "Jose Martinez" <jm...@opencrowd.com> wrote:
>
> > Seems like i keep having the issue even after increasing the limit. I
> > wonder what does the 'WriteChecker" message really means. And why can't
> the
> > connection remain stable? Is there a way to automatically recycle the
> > connection to keep it "fresh"?
> >
> > Any suggestions appreciated.
> >
> >
> > On Mon, Dec 24, 2012 at 1:01 PM, Jose Martinez <jmartinez@opencrowd.com
> > >wrote:
> >
> > > Thanks!
> > >
> > > I'll set it as follows:
> > >     broker.getSystemUsage().getTempUsage().setLimit(1024l*64l);
> > >
> > > And see how it goes.
> > >
> > >
> > >
> > > On Mon, Dec 24, 2012 at 12:03 PM, Raul Kripalani <ra...@evosent.com>
> > wrote:
> > >
> > >> You are setting a tempUsage limit of 2 kilobytes. Maybe you expected
> > 2000
> > >> to mean megabytes, but the unit is bytes. See Javadoc [1].
> > >>
> > >> In a non-persistent broker, the role of tempUsage is to buffer up
> > >> non-persistent messages when consumers can't keep up. The temp storage
> > >> implementation (PListStore) uses a default journal size of 32mb. So
> your
> > >> system locks up as soon as it lazy inits the PListStore for the first
> > >> time,
> > >> because tempUsage < default journal file size.
> > >>
> > >> My two cents. Please let us know if this helped.
> > >>
> > >> Thanks,
> > >>
> > >> [1]
> > >>
> > >>
> >
> http://activemq.apache.org/maven/5.7.0/activemq-core/apidocs/org/apache/activemq/usage/Usage.html#setLimit(long)
> > >> .
> > >>
> > >> *Raúl Kripalani*
> > >> Apache Camel Committer
> > >> Enterprise Architect, Program Manager, Open Source Integration
> > specialist
> > >> http://about.me/raulkripalani |
> > http://www.linkedin.com/in/raulkripalani
> > >> http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk>
> > >>
> > >> On Mon, Dec 24, 2012 at 4:50 PM, Jose Martinez <
> jmartinez@opencrowd.com
> > >> >wrote:
> > >>
> > >> > Hello,
> > >> >
> > >> > I've implemented an embedded activemq service and a message listener
> > >> using
> > >> > in activemq 5.7.0. It works fine for a few hours but I've noticed
> that
> > >> > after a day or two of inactivity messages stop getting consumed and
> I
> > >> keep
> > >> > getting the following message in the logs:
> > >> >
> > >> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> > >> > elapsed since last write check.
> > >> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
> > >> > WriteCheck[tcp://my.ip.addr:61616]
> > >> > 16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> > >> > elapsed since last write check.
> > >> > 16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
> > >> > WriteCheck[tcp://my.ip.addr:61616]
> > >> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> > >> > elapsed since last write check.
> > >> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
> > >> > WriteCheck[tcp://my.ip.addr:61616]
> > >> >
> > >> >
> > >> > It looks like my connection becomes inactive after awhile and
> messages
> > >> get
> > >> > stuck in the queue. I can see in netstat the several ports
> > >> > remain occupied but messages are not dequeued.
> > >> >
> > >> > Here's what my embedded service looks like (implemented as a servlet
> > in
> > >> > tomcat):
> > >> >
> > >> > public void init(ServletConfig config) throws ServletException {
> > >> > super.init(config);
> > >> >  logger.info("Initializing message queue listener");
> > >> >  try {
> > >> > BrokerService broker = new BrokerService();
> > >> >
> > >> > TransportConnector connector = new TransportConnector();
> > >> > connector.setUri(new URI("tcp://localhost:61616"));
> > >> > broker.addConnector(connector);
> > >> > broker.setPersistent(false);
> > >> > broker.getSystemUsage().getTempUsage().setLimit(2000);
> > >> > broker.start();
> > >> > EntityXMLConsumer.startInstance(); /this is the listener
> > >> > } catch (Exception e) {
> > >> > logger.error(e);
> > >> > }
> > >> > }
> > >> >
> > >> >
> > >> > And here's my listener.
> > >> >
> > >> > public class EntityXMLConsumer implements MessageListener,
> > >> > ExceptionListener{
> > >> >  private static Logger logger =
> > >> Logger.getLogger(EntityXMLConsumer.class);
> > >> >  private static ConfigManager configManager =
> > >> ConfigManager.getInstance();
> > >> > private static String url =
> > >> > configManager.getProperty("popworkflowsvc.activemq.url");
> > >> > //private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
> > >> >     private static String subject = "WORKFLOW";
> > >> >     private static Session session;
> > >> >     private MessageConsumer consumer;
> > >> >     private static Connection connection;
> > >> >     private static EntityXMLConsumer instance = null;
> > >> >
> > >> > private EntityXMLConsumer() throws JMSException{
> > >> > logger.info("Initializing message listener on url :" + url);
> > >> > ActiveMQConnectionFactory connectionFactory = new
> > >> > ActiveMQConnectionFactory(url);
> > >> >                 connection = connectionFactory.createConnection();
> > >> >                // to use transactions you should set the first
> > >> parameter to
> > >> > 'true'
> > >> >                session = connection.createSession(false,
> > >> > Session.AUTO_ACKNOWLEDGE);
> > >> >                Destination destination =
> session.createQueue(subject);
> > >> >                consumer = session.createConsumer(destination);
> > >> >
> > >> >         consumer.setMessageListener(this);
> > >> >             connection.start();
> > >> > }
> > >> >  public static EntityXMLConsumer startInstance() throws
> JMSException {
> > >> >  if (instance == null){
> > >> > instance = new EntityXMLConsumer();
> > >> > }
> > >> >  return instance;
> > >> > }
> > >> >  public static void closeConnection() {
> > >> >         try {
> > >> >          logger.info("closing connection");
> > >> > connection.close();
> > >> > } catch (JMSException e) {
> > >> > logger.error(e);
> > >> > }
> > >> > }
> > >> >
> > >> >
> > >> > public void onException(JMSException ex) {
> > >> > logger.error(ex);
> > >> > closeConnection();
> > >> > }
> > >> >
> > >> >
> > >> > public void onMessage(Message message) {
> > >> >         final String MSG_ID = "ENTITY_DATA";
> > >> >         PopWorkflowServiceSEI service = new
> PopWorkflowServiceImpl();
> > >> >
> > >> >     try {
> > >> > logger.info("message received: " + message );
> > >> > if (message instanceof MapMessage){
> > >> > MapMessage mapMessage = (MapMessage) message;
> > >> > Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
> > >> > for (Object messageKey: payloadMap.keySet()){
> > >> > logger.info("Module ID: " + messageKey);
> > >> > Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
> > >> > String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
> > >> > String[].class);
> > >> > service.initWorkflow(strEntityXML , (String)messageKey);
> > >> > logger.debug("Entity XML " + payloadMap.get(messageKey));
> > >> > }
> > >> > }
> > >> >      } catch (JMSException e) {
> > >> >      logger.error(e);
> > >> >      closeConnection();
> > >> > }
> > >> > }
> > >> > }
> > >> >
> > >> >
> > >> > So it looks like either the listener or the broker stop responding
> and
> > >> > messages get stuck in the queue. Are there any additional
> parameters I
> > >> need
> > >> > to configure? Any pointers appreciated.
> > >> >
> > >> >
> > >> > Thanks for your help.
> > >> >
> > >>
> > >
> > >
> >
>

Re: Embedded Broker/Listener becomes inactive and doesn't dequeue messages

Posted by Raul Kripalani <ra...@evosent.com>.
Can you attach a JConsole screenshot of the Broker MBean attributes?

Does this happen with just one destination or with all?

The Inactivity Monitor WriteChecker is doing exactly what you pursue:
keeping the connection alive by sending pings back and forth.

It doesn't report errors, so it looks like the connections are okay. The
Broker JMX screenshot may tell us what's going on.

Raúl.
On 25 Dec 2012 19:33, "Jose Martinez" <jm...@opencrowd.com> wrote:

> Seems like i keep having the issue even after increasing the limit. I
> wonder what does the 'WriteChecker" message really means. And why can't the
> connection remain stable? Is there a way to automatically recycle the
> connection to keep it "fresh"?
>
> Any suggestions appreciated.
>
>
> On Mon, Dec 24, 2012 at 1:01 PM, Jose Martinez <jmartinez@opencrowd.com
> >wrote:
>
> > Thanks!
> >
> > I'll set it as follows:
> >     broker.getSystemUsage().getTempUsage().setLimit(1024l*64l);
> >
> > And see how it goes.
> >
> >
> >
> > On Mon, Dec 24, 2012 at 12:03 PM, Raul Kripalani <ra...@evosent.com>
> wrote:
> >
> >> You are setting a tempUsage limit of 2 kilobytes. Maybe you expected
> 2000
> >> to mean megabytes, but the unit is bytes. See Javadoc [1].
> >>
> >> In a non-persistent broker, the role of tempUsage is to buffer up
> >> non-persistent messages when consumers can't keep up. The temp storage
> >> implementation (PListStore) uses a default journal size of 32mb. So your
> >> system locks up as soon as it lazy inits the PListStore for the first
> >> time,
> >> because tempUsage < default journal file size.
> >>
> >> My two cents. Please let us know if this helped.
> >>
> >> Thanks,
> >>
> >> [1]
> >>
> >>
> http://activemq.apache.org/maven/5.7.0/activemq-core/apidocs/org/apache/activemq/usage/Usage.html#setLimit(long)
> >> .
> >>
> >> *Raúl Kripalani*
> >> Apache Camel Committer
> >> Enterprise Architect, Program Manager, Open Source Integration
> specialist
> >> http://about.me/raulkripalani |
> http://www.linkedin.com/in/raulkripalani
> >> http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk>
> >>
> >> On Mon, Dec 24, 2012 at 4:50 PM, Jose Martinez <jmartinez@opencrowd.com
> >> >wrote:
> >>
> >> > Hello,
> >> >
> >> > I've implemented an embedded activemq service and a message listener
> >> using
> >> > in activemq 5.7.0. It works fine for a few hours but I've noticed that
> >> > after a day or two of inactivity messages stop getting consumed and I
> >> keep
> >> > getting the following message in the logs:
> >> >
> >> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> >> > elapsed since last write check.
> >> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
> >> > WriteCheck[tcp://my.ip.addr:61616]
> >> > 16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> >> > elapsed since last write check.
> >> > 16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
> >> > WriteCheck[tcp://my.ip.addr:61616]
> >> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> >> > elapsed since last write check.
> >> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
> >> > WriteCheck[tcp://my.ip.addr:61616]
> >> >
> >> >
> >> > It looks like my connection becomes inactive after awhile and messages
> >> get
> >> > stuck in the queue. I can see in netstat the several ports
> >> > remain occupied but messages are not dequeued.
> >> >
> >> > Here's what my embedded service looks like (implemented as a servlet
> in
> >> > tomcat):
> >> >
> >> > public void init(ServletConfig config) throws ServletException {
> >> > super.init(config);
> >> >  logger.info("Initializing message queue listener");
> >> >  try {
> >> > BrokerService broker = new BrokerService();
> >> >
> >> > TransportConnector connector = new TransportConnector();
> >> > connector.setUri(new URI("tcp://localhost:61616"));
> >> > broker.addConnector(connector);
> >> > broker.setPersistent(false);
> >> > broker.getSystemUsage().getTempUsage().setLimit(2000);
> >> > broker.start();
> >> > EntityXMLConsumer.startInstance(); /this is the listener
> >> > } catch (Exception e) {
> >> > logger.error(e);
> >> > }
> >> > }
> >> >
> >> >
> >> > And here's my listener.
> >> >
> >> > public class EntityXMLConsumer implements MessageListener,
> >> > ExceptionListener{
> >> >  private static Logger logger =
> >> Logger.getLogger(EntityXMLConsumer.class);
> >> >  private static ConfigManager configManager =
> >> ConfigManager.getInstance();
> >> > private static String url =
> >> > configManager.getProperty("popworkflowsvc.activemq.url");
> >> > //private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
> >> >     private static String subject = "WORKFLOW";
> >> >     private static Session session;
> >> >     private MessageConsumer consumer;
> >> >     private static Connection connection;
> >> >     private static EntityXMLConsumer instance = null;
> >> >
> >> > private EntityXMLConsumer() throws JMSException{
> >> > logger.info("Initializing message listener on url :" + url);
> >> > ActiveMQConnectionFactory connectionFactory = new
> >> > ActiveMQConnectionFactory(url);
> >> >                 connection = connectionFactory.createConnection();
> >> >                // to use transactions you should set the first
> >> parameter to
> >> > 'true'
> >> >                session = connection.createSession(false,
> >> > Session.AUTO_ACKNOWLEDGE);
> >> >                Destination destination = session.createQueue(subject);
> >> >                consumer = session.createConsumer(destination);
> >> >
> >> >         consumer.setMessageListener(this);
> >> >             connection.start();
> >> > }
> >> >  public static EntityXMLConsumer startInstance() throws JMSException {
> >> >  if (instance == null){
> >> > instance = new EntityXMLConsumer();
> >> > }
> >> >  return instance;
> >> > }
> >> >  public static void closeConnection() {
> >> >         try {
> >> >          logger.info("closing connection");
> >> > connection.close();
> >> > } catch (JMSException e) {
> >> > logger.error(e);
> >> > }
> >> > }
> >> >
> >> >
> >> > public void onException(JMSException ex) {
> >> > logger.error(ex);
> >> > closeConnection();
> >> > }
> >> >
> >> >
> >> > public void onMessage(Message message) {
> >> >         final String MSG_ID = "ENTITY_DATA";
> >> >         PopWorkflowServiceSEI service = new PopWorkflowServiceImpl();
> >> >
> >> >     try {
> >> > logger.info("message received: " + message );
> >> > if (message instanceof MapMessage){
> >> > MapMessage mapMessage = (MapMessage) message;
> >> > Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
> >> > for (Object messageKey: payloadMap.keySet()){
> >> > logger.info("Module ID: " + messageKey);
> >> > Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
> >> > String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
> >> > String[].class);
> >> > service.initWorkflow(strEntityXML , (String)messageKey);
> >> > logger.debug("Entity XML " + payloadMap.get(messageKey));
> >> > }
> >> > }
> >> >      } catch (JMSException e) {
> >> >      logger.error(e);
> >> >      closeConnection();
> >> > }
> >> > }
> >> > }
> >> >
> >> >
> >> > So it looks like either the listener or the broker stop responding and
> >> > messages get stuck in the queue. Are there any additional parameters I
> >> need
> >> > to configure? Any pointers appreciated.
> >> >
> >> >
> >> > Thanks for your help.
> >> >
> >>
> >
> >
>

Re: Embedded Broker/Listener becomes inactive and doesn't dequeue messages

Posted by Jose Martinez <jm...@opencrowd.com>.
Seems like i keep having the issue even after increasing the limit. I
wonder what does the 'WriteChecker" message really means. And why can't the
connection remain stable? Is there a way to automatically recycle the
connection to keep it "fresh"?

Any suggestions appreciated.


On Mon, Dec 24, 2012 at 1:01 PM, Jose Martinez <jm...@opencrowd.com>wrote:

> Thanks!
>
> I'll set it as follows:
>     broker.getSystemUsage().getTempUsage().setLimit(1024l*64l);
>
> And see how it goes.
>
>
>
> On Mon, Dec 24, 2012 at 12:03 PM, Raul Kripalani <ra...@evosent.com> wrote:
>
>> You are setting a tempUsage limit of 2 kilobytes. Maybe you expected 2000
>> to mean megabytes, but the unit is bytes. See Javadoc [1].
>>
>> In a non-persistent broker, the role of tempUsage is to buffer up
>> non-persistent messages when consumers can't keep up. The temp storage
>> implementation (PListStore) uses a default journal size of 32mb. So your
>> system locks up as soon as it lazy inits the PListStore for the first
>> time,
>> because tempUsage < default journal file size.
>>
>> My two cents. Please let us know if this helped.
>>
>> Thanks,
>>
>> [1]
>>
>> http://activemq.apache.org/maven/5.7.0/activemq-core/apidocs/org/apache/activemq/usage/Usage.html#setLimit(long)
>> .
>>
>> *Raúl Kripalani*
>> Apache Camel Committer
>> Enterprise Architect, Program Manager, Open Source Integration specialist
>> http://about.me/raulkripalani | http://www.linkedin.com/in/raulkripalani
>> http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk>
>>
>> On Mon, Dec 24, 2012 at 4:50 PM, Jose Martinez <jmartinez@opencrowd.com
>> >wrote:
>>
>> > Hello,
>> >
>> > I've implemented an embedded activemq service and a message listener
>> using
>> > in activemq 5.7.0. It works fine for a few hours but I've noticed that
>> > after a day or two of inactivity messages stop getting consumed and I
>> keep
>> > getting the following message in the logs:
>> >
>> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
>> > elapsed since last write check.
>> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
>> > WriteCheck[tcp://my.ip.addr:61616]
>> > 16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
>> > elapsed since last write check.
>> > 16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
>> > WriteCheck[tcp://my.ip.addr:61616]
>> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
>> > elapsed since last write check.
>> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
>> > WriteCheck[tcp://my.ip.addr:61616]
>> >
>> >
>> > It looks like my connection becomes inactive after awhile and messages
>> get
>> > stuck in the queue. I can see in netstat the several ports
>> > remain occupied but messages are not dequeued.
>> >
>> > Here's what my embedded service looks like (implemented as a servlet in
>> > tomcat):
>> >
>> > public void init(ServletConfig config) throws ServletException {
>> > super.init(config);
>> >  logger.info("Initializing message queue listener");
>> >  try {
>> > BrokerService broker = new BrokerService();
>> >
>> > TransportConnector connector = new TransportConnector();
>> > connector.setUri(new URI("tcp://localhost:61616"));
>> > broker.addConnector(connector);
>> > broker.setPersistent(false);
>> > broker.getSystemUsage().getTempUsage().setLimit(2000);
>> > broker.start();
>> > EntityXMLConsumer.startInstance(); /this is the listener
>> > } catch (Exception e) {
>> > logger.error(e);
>> > }
>> > }
>> >
>> >
>> > And here's my listener.
>> >
>> > public class EntityXMLConsumer implements MessageListener,
>> > ExceptionListener{
>> >  private static Logger logger =
>> Logger.getLogger(EntityXMLConsumer.class);
>> >  private static ConfigManager configManager =
>> ConfigManager.getInstance();
>> > private static String url =
>> > configManager.getProperty("popworkflowsvc.activemq.url");
>> > //private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
>> >     private static String subject = "WORKFLOW";
>> >     private static Session session;
>> >     private MessageConsumer consumer;
>> >     private static Connection connection;
>> >     private static EntityXMLConsumer instance = null;
>> >
>> > private EntityXMLConsumer() throws JMSException{
>> > logger.info("Initializing message listener on url :" + url);
>> > ActiveMQConnectionFactory connectionFactory = new
>> > ActiveMQConnectionFactory(url);
>> >                 connection = connectionFactory.createConnection();
>> >                // to use transactions you should set the first
>> parameter to
>> > 'true'
>> >                session = connection.createSession(false,
>> > Session.AUTO_ACKNOWLEDGE);
>> >                Destination destination = session.createQueue(subject);
>> >                consumer = session.createConsumer(destination);
>> >
>> >         consumer.setMessageListener(this);
>> >             connection.start();
>> > }
>> >  public static EntityXMLConsumer startInstance() throws JMSException {
>> >  if (instance == null){
>> > instance = new EntityXMLConsumer();
>> > }
>> >  return instance;
>> > }
>> >  public static void closeConnection() {
>> >         try {
>> >          logger.info("closing connection");
>> > connection.close();
>> > } catch (JMSException e) {
>> > logger.error(e);
>> > }
>> > }
>> >
>> >
>> > public void onException(JMSException ex) {
>> > logger.error(ex);
>> > closeConnection();
>> > }
>> >
>> >
>> > public void onMessage(Message message) {
>> >         final String MSG_ID = "ENTITY_DATA";
>> >         PopWorkflowServiceSEI service = new PopWorkflowServiceImpl();
>> >
>> >     try {
>> > logger.info("message received: " + message );
>> > if (message instanceof MapMessage){
>> > MapMessage mapMessage = (MapMessage) message;
>> > Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
>> > for (Object messageKey: payloadMap.keySet()){
>> > logger.info("Module ID: " + messageKey);
>> > Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
>> > String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
>> > String[].class);
>> > service.initWorkflow(strEntityXML , (String)messageKey);
>> > logger.debug("Entity XML " + payloadMap.get(messageKey));
>> > }
>> > }
>> >      } catch (JMSException e) {
>> >      logger.error(e);
>> >      closeConnection();
>> > }
>> > }
>> > }
>> >
>> >
>> > So it looks like either the listener or the broker stop responding and
>> > messages get stuck in the queue. Are there any additional parameters I
>> need
>> > to configure? Any pointers appreciated.
>> >
>> >
>> > Thanks for your help.
>> >
>>
>
>

Re: Embedded Broker/Listener becomes inactive and doesn't dequeue messages

Posted by Jose Martinez <jm...@opencrowd.com>.
Thanks!

I'll set it as follows:
    broker.getSystemUsage().getTempUsage().setLimit(1024l*64l);

And see how it goes.



On Mon, Dec 24, 2012 at 12:03 PM, Raul Kripalani <ra...@evosent.com> wrote:

> You are setting a tempUsage limit of 2 kilobytes. Maybe you expected 2000
> to mean megabytes, but the unit is bytes. See Javadoc [1].
>
> In a non-persistent broker, the role of tempUsage is to buffer up
> non-persistent messages when consumers can't keep up. The temp storage
> implementation (PListStore) uses a default journal size of 32mb. So your
> system locks up as soon as it lazy inits the PListStore for the first time,
> because tempUsage < default journal file size.
>
> My two cents. Please let us know if this helped.
>
> Thanks,
>
> [1]
>
> http://activemq.apache.org/maven/5.7.0/activemq-core/apidocs/org/apache/activemq/usage/Usage.html#setLimit(long)
> .
>
> *Raúl Kripalani*
> Apache Camel Committer
> Enterprise Architect, Program Manager, Open Source Integration specialist
> http://about.me/raulkripalani | http://www.linkedin.com/in/raulkripalani
> http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk>
>
> On Mon, Dec 24, 2012 at 4:50 PM, Jose Martinez <jmartinez@opencrowd.com
> >wrote:
>
> > Hello,
> >
> > I've implemented an embedded activemq service and a message listener
> using
> > in activemq 5.7.0. It works fine for a few hours but I've noticed that
> > after a day or two of inactivity messages stop getting consumed and I
> keep
> > getting the following message in the logs:
> >
> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> > elapsed since last write check.
> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
> > WriteCheck[tcp://my.ip.addr:61616]
> > 16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> > elapsed since last write check.
> > 16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
> > WriteCheck[tcp://my.ip.addr:61616]
> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> > elapsed since last write check.
> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
> > WriteCheck[tcp://my.ip.addr:61616]
> >
> >
> > It looks like my connection becomes inactive after awhile and messages
> get
> > stuck in the queue. I can see in netstat the several ports
> > remain occupied but messages are not dequeued.
> >
> > Here's what my embedded service looks like (implemented as a servlet in
> > tomcat):
> >
> > public void init(ServletConfig config) throws ServletException {
> > super.init(config);
> >  logger.info("Initializing message queue listener");
> >  try {
> > BrokerService broker = new BrokerService();
> >
> > TransportConnector connector = new TransportConnector();
> > connector.setUri(new URI("tcp://localhost:61616"));
> > broker.addConnector(connector);
> > broker.setPersistent(false);
> > broker.getSystemUsage().getTempUsage().setLimit(2000);
> > broker.start();
> > EntityXMLConsumer.startInstance(); /this is the listener
> > } catch (Exception e) {
> > logger.error(e);
> > }
> > }
> >
> >
> > And here's my listener.
> >
> > public class EntityXMLConsumer implements MessageListener,
> > ExceptionListener{
> >  private static Logger logger =
> Logger.getLogger(EntityXMLConsumer.class);
> >  private static ConfigManager configManager =
> ConfigManager.getInstance();
> > private static String url =
> > configManager.getProperty("popworkflowsvc.activemq.url");
> > //private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
> >     private static String subject = "WORKFLOW";
> >     private static Session session;
> >     private MessageConsumer consumer;
> >     private static Connection connection;
> >     private static EntityXMLConsumer instance = null;
> >
> > private EntityXMLConsumer() throws JMSException{
> > logger.info("Initializing message listener on url :" + url);
> > ActiveMQConnectionFactory connectionFactory = new
> > ActiveMQConnectionFactory(url);
> >                 connection = connectionFactory.createConnection();
> >                // to use transactions you should set the first parameter
> to
> > 'true'
> >                session = connection.createSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> >                Destination destination = session.createQueue(subject);
> >                consumer = session.createConsumer(destination);
> >
> >         consumer.setMessageListener(this);
> >             connection.start();
> > }
> >  public static EntityXMLConsumer startInstance() throws JMSException {
> >  if (instance == null){
> > instance = new EntityXMLConsumer();
> > }
> >  return instance;
> > }
> >  public static void closeConnection() {
> >         try {
> >          logger.info("closing connection");
> > connection.close();
> > } catch (JMSException e) {
> > logger.error(e);
> > }
> > }
> >
> >
> > public void onException(JMSException ex) {
> > logger.error(ex);
> > closeConnection();
> > }
> >
> >
> > public void onMessage(Message message) {
> >         final String MSG_ID = "ENTITY_DATA";
> >         PopWorkflowServiceSEI service = new PopWorkflowServiceImpl();
> >
> >     try {
> > logger.info("message received: " + message );
> > if (message instanceof MapMessage){
> > MapMessage mapMessage = (MapMessage) message;
> > Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
> > for (Object messageKey: payloadMap.keySet()){
> > logger.info("Module ID: " + messageKey);
> > Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
> > String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
> > String[].class);
> > service.initWorkflow(strEntityXML , (String)messageKey);
> > logger.debug("Entity XML " + payloadMap.get(messageKey));
> > }
> > }
> >      } catch (JMSException e) {
> >      logger.error(e);
> >      closeConnection();
> > }
> > }
> > }
> >
> >
> > So it looks like either the listener or the broker stop responding and
> > messages get stuck in the queue. Are there any additional parameters I
> need
> > to configure? Any pointers appreciated.
> >
> >
> > Thanks for your help.
> >
>

Re: Embedded Broker/Listener becomes inactive and doesn't dequeue messages

Posted by Raul Kripalani <ra...@evosent.com>.
You are setting a tempUsage limit of 2 kilobytes. Maybe you expected 2000
to mean megabytes, but the unit is bytes. See Javadoc [1].

In a non-persistent broker, the role of tempUsage is to buffer up
non-persistent messages when consumers can't keep up. The temp storage
implementation (PListStore) uses a default journal size of 32mb. So your
system locks up as soon as it lazy inits the PListStore for the first time,
because tempUsage < default journal file size.

My two cents. Please let us know if this helped.

Thanks,

[1]
http://activemq.apache.org/maven/5.7.0/activemq-core/apidocs/org/apache/activemq/usage/Usage.html#setLimit(long)
.

*Raúl Kripalani*
Apache Camel Committer
Enterprise Architect, Program Manager, Open Source Integration specialist
http://about.me/raulkripalani | http://www.linkedin.com/in/raulkripalani
http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk>

On Mon, Dec 24, 2012 at 4:50 PM, Jose Martinez <jm...@opencrowd.com>wrote:

> Hello,
>
> I've implemented an embedded activemq service and a message listener using
> in activemq 5.7.0. It works fine for a few hours but I've noticed that
> after a day or two of inactivity messages stop getting consumed and I keep
> getting the following message in the logs:
>
> 16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> elapsed since last write check.
> 16:19:01,627 DEBUG [AbstractInactivityMonitor] Running
> WriteCheck[tcp://my.ip.addr:61616]
> 16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> elapsed since last write check.
> 16:19:11,625 DEBUG [AbstractInactivityMonitor] Running
> WriteCheck[tcp://my.ip.addr:61616]
> 16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms
> elapsed since last write check.
> 16:19:11,627 DEBUG [AbstractInactivityMonitor] Running
> WriteCheck[tcp://my.ip.addr:61616]
>
>
> It looks like my connection becomes inactive after awhile and messages get
> stuck in the queue. I can see in netstat the several ports
> remain occupied but messages are not dequeued.
>
> Here's what my embedded service looks like (implemented as a servlet in
> tomcat):
>
> public void init(ServletConfig config) throws ServletException {
> super.init(config);
>  logger.info("Initializing message queue listener");
>  try {
> BrokerService broker = new BrokerService();
>
> TransportConnector connector = new TransportConnector();
> connector.setUri(new URI("tcp://localhost:61616"));
> broker.addConnector(connector);
> broker.setPersistent(false);
> broker.getSystemUsage().getTempUsage().setLimit(2000);
> broker.start();
> EntityXMLConsumer.startInstance(); /this is the listener
> } catch (Exception e) {
> logger.error(e);
> }
> }
>
>
> And here's my listener.
>
> public class EntityXMLConsumer implements MessageListener,
> ExceptionListener{
>  private static Logger logger = Logger.getLogger(EntityXMLConsumer.class);
>  private static ConfigManager configManager = ConfigManager.getInstance();
> private static String url =
> configManager.getProperty("popworkflowsvc.activemq.url");
> //private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
>     private static String subject = "WORKFLOW";
>     private static Session session;
>     private MessageConsumer consumer;
>     private static Connection connection;
>     private static EntityXMLConsumer instance = null;
>
> private EntityXMLConsumer() throws JMSException{
> logger.info("Initializing message listener on url :" + url);
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(url);
>                 connection = connectionFactory.createConnection();
>                // to use transactions you should set the first parameter to
> 'true'
>                session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>                Destination destination = session.createQueue(subject);
>                consumer = session.createConsumer(destination);
>
>         consumer.setMessageListener(this);
>             connection.start();
> }
>  public static EntityXMLConsumer startInstance() throws JMSException {
>  if (instance == null){
> instance = new EntityXMLConsumer();
> }
>  return instance;
> }
>  public static void closeConnection() {
>         try {
>          logger.info("closing connection");
> connection.close();
> } catch (JMSException e) {
> logger.error(e);
> }
> }
>
>
> public void onException(JMSException ex) {
> logger.error(ex);
> closeConnection();
> }
>
>
> public void onMessage(Message message) {
>         final String MSG_ID = "ENTITY_DATA";
>         PopWorkflowServiceSEI service = new PopWorkflowServiceImpl();
>
>     try {
> logger.info("message received: " + message );
> if (message instanceof MapMessage){
> MapMessage mapMessage = (MapMessage) message;
> Map payloadMap = (Map)mapMessage.getObject(MSG_ID);
> for (Object messageKey: payloadMap.keySet()){
> logger.info("Module ID: " + messageKey);
> Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray();
> String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length,
> String[].class);
> service.initWorkflow(strEntityXML , (String)messageKey);
> logger.debug("Entity XML " + payloadMap.get(messageKey));
> }
> }
>      } catch (JMSException e) {
>      logger.error(e);
>      closeConnection();
> }
> }
> }
>
>
> So it looks like either the listener or the broker stop responding and
> messages get stuck in the queue. Are there any additional parameters I need
> to configure? Any pointers appreciated.
>
>
> Thanks for your help.
>