You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by "Geldhof, Kristof" <Kr...@ThomasCook.be> on 2019/01/21 14:59:41 UTC

how to detect recovery of activemq messageListenerContainer?

Hi,

I'm using Camel 2.23.0 and activemq-camel. I'm consuming messages from a virtual topic. I want to detect when there are problems in the connection with the broker and when (automatic) recovery is successful. For the initial detection of a connection error I use an exception listener. This one is triggered as expected. However, I don't succeed in detecting when the auto-recovery has succeeded (I need to be able to detect the recovery without a new message being consumed). I tried to use an instance of DefaultMessageListenerContainer and call its isRecovering() method to see if recovery has succeeded.

from("imq:queue:Consumer.app.VirtualTopic.Update?messageListenerContainerFactoryRef=#myContainerFactory")
.log("Message received from virtual topic 'VirtualTopic.Update': ${body}")
...

An exception listener "JmsExceptionListener" listens for exceptions and triggers a route:

public class JmsExceptionListener implements ExceptionListener {

                @Autowired
                private ProducerTemplate producerTemplate;

@Override
public void onException(JMSException exception) {
                producerTemplate.sendBody ("direct:error", "");
}
}

Error route:

from("direct:error")
                .onException(Exception.class)
                               ... (infinite retries with fixed delay)
                .end()
.filter(method("myMessageListenerContainer", "isRecovering")
                .throwException(new Exception("still recovering"))
.end()
.log("Connection restored")
...

Bean configuration:

<bean id="imq" class="org.apache.activemq.camel.component.ActiveMQComponent"
          p:acknowledgementModeName="CLIENT_ACKNOWLEDGE"
          p:receiveTimeout="30000"
          p:requestTimeout="30000"
          p:concurrentConsumers="1"
          p:cacheLevelName="CACHE_CONSUMER"
          p:testConnectionOnStartup="true"
          p:connectionFactory-ref="imqConnectionFactory"
          p:recoveryInterval="2000"/>

<amq:connectionFactory id="imqConnectionFactory"  consumerExpiryCheckEnabled="false"  exceptionListener="#amqExceptionListener"  brokerURL="${imq.broker.url}"/>
<amq:prefetchPolicy all="1"/>

<bean id="amqExceptionListener" class="be.myapp.listener.JmsExceptionListener"/>

<bean id="myMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
                p:connectionFactory-ref="imqConnectionFactory"
                p:destination-name="queue:Consumer.app.VirtualTopic.Update"/>

Classes:

public class MyMessageListenerContainerFactory implements MessageListenerContainerFactory {

                @Autowired
                private DefaultMessageListenerContainer myMessageListenerContainer;

                @Override
                public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) {
                               return myMessageListenerContainer;
                }
}

When manually bringing down my locally-running activeMQ broker, the exception handler is correctly triggered, but the isRecovering() method of my MyMessageListenerContainer returns false, while I had expected it to return true. What can be the cause of this? Is there a problem in my configuration? Is there any other way to easily detect the recovery state?

Best regards,
Kristof Geldhof

Re: how to detect recovery of activemq messageListenerContainer?

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

I suggest to dive into the spring jms code, as its the library that is
the baseline for the Camel JMS component. It has the logic for
recovery.

On Mon, Jan 21, 2019 at 3:59 PM Geldhof, Kristof
<Kr...@thomascook.be> wrote:
>
> Hi,
>
> I'm using Camel 2.23.0 and activemq-camel. I'm consuming messages from a virtual topic. I want to detect when there are problems in the connection with the broker and when (automatic) recovery is successful. For the initial detection of a connection error I use an exception listener. This one is triggered as expected. However, I don't succeed in detecting when the auto-recovery has succeeded (I need to be able to detect the recovery without a new message being consumed). I tried to use an instance of DefaultMessageListenerContainer and call its isRecovering() method to see if recovery has succeeded.
>
> from("imq:queue:Consumer.app.VirtualTopic.Update?messageListenerContainerFactoryRef=#myContainerFactory")
> .log("Message received from virtual topic 'VirtualTopic.Update': ${body}")
> ...
>
> An exception listener "JmsExceptionListener" listens for exceptions and triggers a route:
>
> public class JmsExceptionListener implements ExceptionListener {
>
>                 @Autowired
>                 private ProducerTemplate producerTemplate;
>
> @Override
> public void onException(JMSException exception) {
>                 producerTemplate.sendBody ("direct:error", "");
> }
> }
>
> Error route:
>
> from("direct:error")
>                 .onException(Exception.class)
>                                ... (infinite retries with fixed delay)
>                 .end()
> .filter(method("myMessageListenerContainer", "isRecovering")
>                 .throwException(new Exception("still recovering"))
> .end()
> .log("Connection restored")
> ...
>
> Bean configuration:
>
> <bean id="imq" class="org.apache.activemq.camel.component.ActiveMQComponent"
>           p:acknowledgementModeName="CLIENT_ACKNOWLEDGE"
>           p:receiveTimeout="30000"
>           p:requestTimeout="30000"
>           p:concurrentConsumers="1"
>           p:cacheLevelName="CACHE_CONSUMER"
>           p:testConnectionOnStartup="true"
>           p:connectionFactory-ref="imqConnectionFactory"
>           p:recoveryInterval="2000"/>
>
> <amq:connectionFactory id="imqConnectionFactory"  consumerExpiryCheckEnabled="false"  exceptionListener="#amqExceptionListener"  brokerURL="${imq.broker.url}"/>
> <amq:prefetchPolicy all="1"/>
>
> <bean id="amqExceptionListener" class="be.myapp.listener.JmsExceptionListener"/>
>
> <bean id="myMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
>                 p:connectionFactory-ref="imqConnectionFactory"
>                 p:destination-name="queue:Consumer.app.VirtualTopic.Update"/>
>
> Classes:
>
> public class MyMessageListenerContainerFactory implements MessageListenerContainerFactory {
>
>                 @Autowired
>                 private DefaultMessageListenerContainer myMessageListenerContainer;
>
>                 @Override
>                 public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) {
>                                return myMessageListenerContainer;
>                 }
> }
>
> When manually bringing down my locally-running activeMQ broker, the exception handler is correctly triggered, but the isRecovering() method of my MyMessageListenerContainer returns false, while I had expected it to return true. What can be the cause of this? Is there a problem in my configuration? Is there any other way to easily detect the recovery state?
>
> Best regards,
> Kristof Geldhof



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2