You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Richa <rs...@gmail.com> on 2014/07/17 12:22:28 UTC

ActiveMQ topic as input endpoint in Apache Camel

Hi,

I have a camel route having input endpoint as activemq topic. I have added a
selector to the topic for checking a header. But it looks like I am losing
messages on the topic. My topic configuration is as follows: 

ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory();
			if(!routeDetails.getInputEndPoint().startsWith("failover")){
			//This method will generate the hostname which will act as broker url in
this case.
			getBasicUrl(ACTIVE_MQ_TOPIC_PREFIX, true, INPUT);
				if(!Util.isNullOrEmpty(hostName,port)){
					connectionFactory.setBrokerURL(TCP + hostName + COLON + port);
				}else{
					throw new Exception("Broker url could not be set as IP address/Port
could not be obtained from input.");
				}
			}
			else{
				connectionFactory.setBrokerURL(routeDetails.getInputEndPoint());
			}
			StringBuilder inputLocations = new StringBuilder(ACTIVE_MQ_TOPIC_PREFIX);
			inputLocations.append(inputProperties[1]);
			inputLocations.append(QUESTION_MARK+"maxConcurrentConsumers=" + 
(MIN_CONSUMERS+(int)(Math.random()*MAX_CONSUMERS)));
			jmsEndpoint = (JmsEndpoint)
getContext().getEndpoint(inputLocations.toString());
			
			PooledConnectionFactory pooledConnectionFactory = new
PooledConnectionFactory();
			pooledConnectionFactory.setMaxConnections(1);
			pooledConnectionFactory.setConnectionFactory(connectionFactory);
			
			ActiveMQConfiguration jmsConfiguration = new ActiveMQConfiguration();
			jmsConfiguration.setPreserveMessageQos(true);
			jmsConfiguration.setUsePooledConnection(false);
			jmsConfiguration.setUseSingleConnection(true);
			//Set the consumer type to custom which will help us in getting our own
messageListenerContainer.
			jmsConfiguration.setConsumerType(ConsumerType.Custom);
			//Override the DMLC by creating our own messageListenerContainer which
will help
			//in cases when jms is down and we want to configure the number of
retries.
			jmsConfiguration.setMessageListenerContainerFactory(new
MessageListenerContainer());
			jmsConfiguration.setConnectionFactory(pooledConnectionFactory);
			jmsConfiguration.createMessageListenerContainer(jmsEndpoint);
			
			jmsEndpoint.setConfiguration(jmsConfiguration);
			jmsEndpoint.setTransacted(true);
                        jmsEndpoint.setSelector("header1='abc'");

I am pushing two messages with the header1 = abc but only one of them is
picked by the route. The route does some processing on the first message and
then sleeps.

Can someone tell me if there is some issue with the configuration of the jms
endpoint?

Thanks,
Richa 




--
View this message in context: http://camel.465427.n5.nabble.com/ActiveMQ-topic-as-input-endpoint-in-Apache-Camel-tp5753957.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: ActiveMQ topic as input endpoint in Apache Camel

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Are you sending as persistent and as a durable topic.
http://activemq.apache.org/why-do-i-not-receive-messages-on-my-durable-topic-subscription.html
http://activemq.apache.org/how-do-durable-queues-and-topics-work.html

On Thu, Jul 17, 2014 at 12:22 PM, Richa <rs...@gmail.com> wrote:
> Hi,
>
> I have a camel route having input endpoint as activemq topic. I have added a
> selector to the topic for checking a header. But it looks like I am losing
> messages on the topic. My topic configuration is as follows:
>
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory();
>                         if(!routeDetails.getInputEndPoint().startsWith("failover")){
>                         //This method will generate the hostname which will act as broker url in
> this case.
>                         getBasicUrl(ACTIVE_MQ_TOPIC_PREFIX, true, INPUT);
>                                 if(!Util.isNullOrEmpty(hostName,port)){
>                                         connectionFactory.setBrokerURL(TCP + hostName + COLON + port);
>                                 }else{
>                                         throw new Exception("Broker url could not be set as IP address/Port
> could not be obtained from input.");
>                                 }
>                         }
>                         else{
>                                 connectionFactory.setBrokerURL(routeDetails.getInputEndPoint());
>                         }
>                         StringBuilder inputLocations = new StringBuilder(ACTIVE_MQ_TOPIC_PREFIX);
>                         inputLocations.append(inputProperties[1]);
>                         inputLocations.append(QUESTION_MARK+"maxConcurrentConsumers=" +
> (MIN_CONSUMERS+(int)(Math.random()*MAX_CONSUMERS)));
>                         jmsEndpoint = (JmsEndpoint)
> getContext().getEndpoint(inputLocations.toString());
>
>                         PooledConnectionFactory pooledConnectionFactory = new
> PooledConnectionFactory();
>                         pooledConnectionFactory.setMaxConnections(1);
>                         pooledConnectionFactory.setConnectionFactory(connectionFactory);
>
>                         ActiveMQConfiguration jmsConfiguration = new ActiveMQConfiguration();
>                         jmsConfiguration.setPreserveMessageQos(true);
>                         jmsConfiguration.setUsePooledConnection(false);
>                         jmsConfiguration.setUseSingleConnection(true);
>                         //Set the consumer type to custom which will help us in getting our own
> messageListenerContainer.
>                         jmsConfiguration.setConsumerType(ConsumerType.Custom);
>                         //Override the DMLC by creating our own messageListenerContainer which
> will help
>                         //in cases when jms is down and we want to configure the number of
> retries.
>                         jmsConfiguration.setMessageListenerContainerFactory(new
> MessageListenerContainer());
>                         jmsConfiguration.setConnectionFactory(pooledConnectionFactory);
>                         jmsConfiguration.createMessageListenerContainer(jmsEndpoint);
>
>                         jmsEndpoint.setConfiguration(jmsConfiguration);
>                         jmsEndpoint.setTransacted(true);
>                         jmsEndpoint.setSelector("header1='abc'");
>
> I am pushing two messages with the header1 = abc but only one of them is
> picked by the route. The route does some processing on the first message and
> then sleeps.
>
> Can someone tell me if there is some issue with the configuration of the jms
> endpoint?
>
> Thanks,
> Richa
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/ActiveMQ-topic-as-input-endpoint-in-Apache-Camel-tp5753957.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/