You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by kishore <ki...@gmail.com> on 2015/03/14 22:35:08 UTC

Concurrent Consumers creating duplicates

Hello All,

I have a simple route which takes messages from an incoming queue, processes
the message and writes it an outgoing queue. When I log the
exchange.getIn().getBody() inside my processor, I see duplicates.

Here is the snippet of my route

			/MQQueueConnectionFactory cf = new MQQueueConnectionFactory();
			// Config
			cf.setHostName("127.0.0.1");
			cf.setPort(1414);
			cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
			cf.setQueueManager("TestQueueManager");
			cf.setChannel("SYSTEM.DEF.SVRCONN");
			cxt.addComponent("websphere-mq", JmsComponent.jmsComponent(cf));

			cxt.addRoutes(new RouteBuilder() {
				public void configure() {
					from("websphere-mq:queue:incoming?concurrentConsumers=5").process(new
FixProcessor()).to(
							"websphere-mq:queue:outgoing");
				}
			});/

Here is the snippet of my processor

	/public void process(Exchange exchange) throws Exception {

		originalMessage = exchange.getIn().getBody(String.class);

		ExecutionTransform execTran = new ExecutionTransform();
		
		LOG.info("originalMessage:" + originalMessage);/


Here is the snippet of sample message generator

		   /   MQQueueConnectionFactory cf = new MQQueueConnectionFactory();
		      // Config
		      cf.setHostName("127.0.0.1");
		      cf.setPort(1414);
		      cf.setTransportType(1);
		      cf.setQueueManager("TestQueueManager");
		      cf.setChannel("SYSTEM.DEF.SVRCONN");

		      MQQueueConnection connection = (MQQueueConnection)
cf.createQueueConnection();
		      MQQueueSession session = (MQQueueSession)
connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
		      MQQueue queue = (MQQueue) session.createQueue("queue:///incoming");
		      MQQueueSender sender =  (MQQueueSender) session.createSender(queue);
		      // Start the connection
              connection.start();
		      for(int  i =0 ; i <100 ; i++){
		          String msg = "Test Message " + i;
		          JMSTextMessage message = (JMSTextMessage)
session.createTextMessage(msg); 
		          System.out.println(message.toString());
		          sender.send(message);
		      }/

After I send 100 messages, I randomly get the originalMessage printed as
duplicate.

Can someone please what might be wrong with this?

Thanks,
Kishore.



--
View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Concurrent Consumers creating duplicates

Posted by kishore <ki...@gmail.com>.
The problem was with the processor. I didn't realize that processor is a
singleton. I have change the code to use bean with prototype scope and it
resolved the issue. Thanks for looking into this Willem Jiang.



--
View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764298.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Concurrent Consumers creating duplicates

Posted by Willem Jiang <wi...@gmail.com>.
The route looks good to me.
Can you double check the JMS acknowledge setting of JMS connection?


--  
Willem Jiang

Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem



On March 16, 2015 at 5:56:36 PM, kishore (kishoredevarasetty@gmail.com) wrote:
> Here is the WMQRouter. The Ack queue is for a different purpose. we have to
> change the message adding some more data and send the Ack back.
>  
>  
> /public class WMQRouter extends RouteBuilder {
>  
> public static org.apache.log4j.Logger LOG =
> org.apache.log4j.Logger.getLogger(WMQRouter.class);
>  
> public String incomingQueue;
>  
> public String outgoingQueue;
>  
> public String backupFolder;
>  
> public String ackQueue;
>  
> public String getIncomingQueue() {
> return incomingQueue;
> }
>  
> public void setIncomingQueue(String incomingQueue) {
> this.incomingQueue = incomingQueue;
> }
>  
> public String getOutgoingQueue() {
> return outgoingQueue;
> }
>  
> public void setOutgoingQueue(String outgoingQueue) {
> this.outgoingQueue = outgoingQueue;
> }
>  
> public String getBackupFolder() {
> return backupFolder;
> }
>  
> public void setBackupFolder(String backupFolder) {
> this.backupFolder = backupFolder;
> }
>  
> public String getAckQueue() {
> return ackQueue;
> }
>  
> public void setAckQueue(String ackQueue) {
> this.ackQueue = ackQueue;
> }
>  
> /**
> * The configure method is invoked by the Camel to process the message
> * exchanges from the incoming queue and place the processed transaction
> xml
> * to corresponding queues This method also processes the on Exception
> * scenario wherein invoking the Error Handler and places the failure
> * message to the Failure Queue
> */
> @Override
> public void configure() throws Exception {
>  
> String incomingQueue = "websphere:queue:" + getIncomingQueue();
>  
> String outgoingQueue = "websphere:queue:" + getOutgoingQueue();
>  
> String failureQueue = "websphere:queue:" + getAckQueue();
>  
> String backupFolder = "file:" + getBackupFolder();
>  
> FixParseValidator.init();
>  
> FixProcessor fixProcessor = new FixProcessor();
> FIXAckProcessor fixAckProcessor = new FIXAckProcessor();
> FIXErrorHandler fixErrorHandler = new FIXErrorHandler();
>  
> fixAckProcessor.setFixProcessor(fixProcessor);
> fixErrorHandler.setFixProcessor(fixProcessor);
>  
> onException(Exception.class).process(fixErrorHandler);
>  
> onCompletion().process(fixAckProcessor).to(failureQueue);
>  
> from(incomingQueue).to(backupFolder).process(fixProcessor)
> .to(outgoingQueue);
> }
> }
> /
>  
>  
>  
> --
> View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764209.html  
> Sent from the Camel - Users mailing list archive at Nabble.com.
>  


Re: Concurrent Consumers creating duplicates

Posted by kishore <ki...@gmail.com>.
Here is the WMQRouter. The Ack queue is for a different purpose. we have to
change the message adding some more data and send the Ack back.


/public class WMQRouter extends RouteBuilder {
	
	public static org.apache.log4j.Logger LOG =
org.apache.log4j.Logger.getLogger(WMQRouter.class);

	public String incomingQueue;

	public String outgoingQueue;

	public String backupFolder;

	public String ackQueue;

	public String getIncomingQueue() {
		return incomingQueue;
	}

	public void setIncomingQueue(String incomingQueue) {
		this.incomingQueue = incomingQueue;
	}

	public String getOutgoingQueue() {
		return outgoingQueue;
	}

	public void setOutgoingQueue(String outgoingQueue) {
		this.outgoingQueue = outgoingQueue;
	}

	public String getBackupFolder() {
		return backupFolder;
	}

	public void setBackupFolder(String backupFolder) {
		this.backupFolder = backupFolder;
	}

	public String getAckQueue() {
		return ackQueue;
	}

	public void setAckQueue(String ackQueue) {
		this.ackQueue = ackQueue;
	}

	/**
	 * The configure method is invoked by the Camel to process the message
	 * exchanges from the incoming queue and place the processed transaction
xml
	 * to corresponding queues This method also processes the on Exception
	 * scenario wherein invoking the Error Handler and places the failure
	 * message to the Failure Queue
	 */
	@Override
	public void configure() throws Exception {

		String incomingQueue = "websphere:queue:" + getIncomingQueue();

		String outgoingQueue = "websphere:queue:" + getOutgoingQueue();

		String failureQueue = "websphere:queue:" + getAckQueue();

		String backupFolder = "file:" + getBackupFolder();

		FixParseValidator.init();

		FixProcessor fixProcessor = new FixProcessor();
		FIXAckProcessor fixAckProcessor = new FIXAckProcessor();
		FIXErrorHandler fixErrorHandler = new FIXErrorHandler();

		fixAckProcessor.setFixProcessor(fixProcessor);
		fixErrorHandler.setFixProcessor(fixProcessor);

		onException(Exception.class).process(fixErrorHandler);

		onCompletion().process(fixAckProcessor).to(failureQueue);

		from(incomingQueue).to(backupFolder).process(fixProcessor)
				.to(outgoingQueue);
	}
}
/



--
View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764209.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Concurrent Consumers creating duplicates

Posted by Willem Jiang <wi...@gmail.com>.
I didn’t find the Route definition of wmqRoute, you may need to show us the code of 
com.broadridge.adapters.fixadapter.router.WMQRouter.

BWT, why do your route need to specify the “ackQueue”?

--  
Willem Jiang

Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem



On March 16, 2015 at 2:01:59 PM, kishore (kishoredevarasetty@gmail.com) wrote:
> Here is my camel-context.xml file
>  
>  
> > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xmlns:context="http://www.springframework.org/schema/context"
> xmlns:camel="http://camel.apache.org/schema/spring"
> xsi:schemaLocation="
> http://www.springframework.org/schema/context
> http://www.springframework.org/schema/context/spring-context-3.0.xsd
> http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
> http://camel.apache.org/schema/spring
> http://camel.apache.org/schema/spring/camel-spring.xsd">
>  
>  
> > class="com.broadridge.adapters.fixadapter.router.WMQRouter">
>  
>  
>  
>  
>  
>  
>  
> >  
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
> > class="com.ibm.mq.jms.MQConnectionFactory">
>  
>  
>  
>  
>  
>  
>  
> > class="org.apache.camel.component.jms.JmsConfiguration">
>  
>  
>  
>  
>  
>  
>  
> --
> View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764201.html  
> Sent from the Camel - Users mailing list archive at Nabble.com.
>  


Re: Concurrent Consumers creating duplicates

Posted by kishore <ki...@gmail.com>.
Here is my camel-context.xml file

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
	xmlns:camel="http://camel.apache.org/schema/spring"
	xsi:schemaLocation="
	   http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring
       http://camel.apache.org/schema/spring/camel-spring.xsd">


	<bean id="wmqRoute"
class="com.broadridge.adapters.fixadapter.router.WMQRouter">
		<property name="incomingQueue" value="${INPUTQUEUE}" />
		<property name="outgoingQueue" value="${OUTPUTQUEUE}" />
		<property name="ackQueue" value="${ACKQUEUE}" />
		<property name="backupFolder" value="${BACKUPFOLDER}" />
		<property name="concurrentConsumers" value="${CONCURRENTCONSUMERS}" />
	</bean>

	<bean id="propertyConfigurer"
	
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

		<property name="location" value="classpath:config.properties">
		</property>
	</bean>

	<camelContext xmlns="http://camel.apache.org/schema/spring">
		<routeBuilder ref="wmqRoute" />
	</camelContext>

	<bean id="websphere" class="org.apache.camel.component.jms.JmsComponent">
		<property name="configuration" ref="websphereConfig" />
	</bean>

	
	<bean id="websphereConnectionFactory"
class="com.ibm.mq.jms.MQConnectionFactory">
		<property name="transportType" value="${TRANSPORT_TYPE}" />
		<property name="hostName" value="${HOSTNAME}" />
		<property name="port" value="${PORT}" />
		<property name="queueManager" value="${QUEUEMANAGER}" />
		<property name="channel" value="${CHANNEL}" />
	</bean>

	<bean id="websphereConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
		<property name="connectionFactory" ref="websphereConnectionFactory" />
	</bean>

</beans>



--
View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764201.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Concurrent Consumers creating duplicates

Posted by Willem Jiang <wi...@gmail.com>.
You may need to check the acknowledge module setting of your JMS connection.
BTW, can you show us the JMS endpoint setting that you have?

--  
Willem Jiang

Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem



On March 15, 2015 at 3:09:28 PM, kishore (kishoredevarasetty@gmail.com) wrote:
> If I set the concurrentConsumers to 1 ,all the input messages are processed
> fine and there are no duplicates. If I set concurrentConsumers to 5, some
> messages are lost and some messages are duplicated. Can someone please
> suggest a solution.
>  
>  
>  
> --
> View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764168.html  
> Sent from the Camel - Users mailing list archive at Nabble.com.
>  


Re: Concurrent Consumers creating duplicates

Posted by kishore <ki...@gmail.com>.
If I set the concurrentConsumers to 1 ,all the input messages are processed
fine and there are no duplicates. If I set concurrentConsumers to 5, some
messages are lost and some messages are duplicated. Can someone please
suggest a solution.



--
View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764168.html
Sent from the Camel - Users mailing list archive at Nabble.com.