You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by kishore <ki...@gmail.com> on 2015/03/14 22:35:08 UTC
Concurrent Consumers creating duplicates
Hello All,
I have a simple route which takes messages from an incoming queue, processes
the message and writes it an outgoing queue. When I log the
exchange.getIn().getBody() inside my processor, I see duplicates.
Here is the snippet of my route
/MQQueueConnectionFactory cf = new MQQueueConnectionFactory();
// Config
cf.setHostName("127.0.0.1");
cf.setPort(1414);
cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
cf.setQueueManager("TestQueueManager");
cf.setChannel("SYSTEM.DEF.SVRCONN");
cxt.addComponent("websphere-mq", JmsComponent.jmsComponent(cf));
cxt.addRoutes(new RouteBuilder() {
public void configure() {
from("websphere-mq:queue:incoming?concurrentConsumers=5").process(new
FixProcessor()).to(
"websphere-mq:queue:outgoing");
}
});/
Here is the snippet of my processor
/public void process(Exchange exchange) throws Exception {
originalMessage = exchange.getIn().getBody(String.class);
ExecutionTransform execTran = new ExecutionTransform();
LOG.info("originalMessage:" + originalMessage);/
Here is the snippet of sample message generator
/ MQQueueConnectionFactory cf = new MQQueueConnectionFactory();
// Config
cf.setHostName("127.0.0.1");
cf.setPort(1414);
cf.setTransportType(1);
cf.setQueueManager("TestQueueManager");
cf.setChannel("SYSTEM.DEF.SVRCONN");
MQQueueConnection connection = (MQQueueConnection)
cf.createQueueConnection();
MQQueueSession session = (MQQueueSession)
connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
MQQueue queue = (MQQueue) session.createQueue("queue:///incoming");
MQQueueSender sender = (MQQueueSender) session.createSender(queue);
// Start the connection
connection.start();
for(int i =0 ; i <100 ; i++){
String msg = "Test Message " + i;
JMSTextMessage message = (JMSTextMessage)
session.createTextMessage(msg);
System.out.println(message.toString());
sender.send(message);
}/
After I send 100 messages, I randomly get the originalMessage printed as
duplicate.
Can someone please what might be wrong with this?
Thanks,
Kishore.
--
View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Re: Concurrent Consumers creating duplicates
Posted by kishore <ki...@gmail.com>.
The problem was with the processor. I didn't realize that processor is a
singleton. I have change the code to use bean with prototype scope and it
resolved the issue. Thanks for looking into this Willem Jiang.
--
View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764298.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Re: Concurrent Consumers creating duplicates
Posted by Willem Jiang <wi...@gmail.com>.
The route looks good to me.
Can you double check the JMS acknowledge setting of JMS connection?
--
Willem Jiang
Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang
Weibo: 姜宁willem
On March 16, 2015 at 5:56:36 PM, kishore (kishoredevarasetty@gmail.com) wrote:
> Here is the WMQRouter. The Ack queue is for a different purpose. we have to
> change the message adding some more data and send the Ack back.
>
>
> /public class WMQRouter extends RouteBuilder {
>
> public static org.apache.log4j.Logger LOG =
> org.apache.log4j.Logger.getLogger(WMQRouter.class);
>
> public String incomingQueue;
>
> public String outgoingQueue;
>
> public String backupFolder;
>
> public String ackQueue;
>
> public String getIncomingQueue() {
> return incomingQueue;
> }
>
> public void setIncomingQueue(String incomingQueue) {
> this.incomingQueue = incomingQueue;
> }
>
> public String getOutgoingQueue() {
> return outgoingQueue;
> }
>
> public void setOutgoingQueue(String outgoingQueue) {
> this.outgoingQueue = outgoingQueue;
> }
>
> public String getBackupFolder() {
> return backupFolder;
> }
>
> public void setBackupFolder(String backupFolder) {
> this.backupFolder = backupFolder;
> }
>
> public String getAckQueue() {
> return ackQueue;
> }
>
> public void setAckQueue(String ackQueue) {
> this.ackQueue = ackQueue;
> }
>
> /**
> * The configure method is invoked by the Camel to process the message
> * exchanges from the incoming queue and place the processed transaction
> xml
> * to corresponding queues This method also processes the on Exception
> * scenario wherein invoking the Error Handler and places the failure
> * message to the Failure Queue
> */
> @Override
> public void configure() throws Exception {
>
> String incomingQueue = "websphere:queue:" + getIncomingQueue();
>
> String outgoingQueue = "websphere:queue:" + getOutgoingQueue();
>
> String failureQueue = "websphere:queue:" + getAckQueue();
>
> String backupFolder = "file:" + getBackupFolder();
>
> FixParseValidator.init();
>
> FixProcessor fixProcessor = new FixProcessor();
> FIXAckProcessor fixAckProcessor = new FIXAckProcessor();
> FIXErrorHandler fixErrorHandler = new FIXErrorHandler();
>
> fixAckProcessor.setFixProcessor(fixProcessor);
> fixErrorHandler.setFixProcessor(fixProcessor);
>
> onException(Exception.class).process(fixErrorHandler);
>
> onCompletion().process(fixAckProcessor).to(failureQueue);
>
> from(incomingQueue).to(backupFolder).process(fixProcessor)
> .to(outgoingQueue);
> }
> }
> /
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764209.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
Re: Concurrent Consumers creating duplicates
Posted by kishore <ki...@gmail.com>.
Here is the WMQRouter. The Ack queue is for a different purpose. we have to
change the message adding some more data and send the Ack back.
/public class WMQRouter extends RouteBuilder {
public static org.apache.log4j.Logger LOG =
org.apache.log4j.Logger.getLogger(WMQRouter.class);
public String incomingQueue;
public String outgoingQueue;
public String backupFolder;
public String ackQueue;
public String getIncomingQueue() {
return incomingQueue;
}
public void setIncomingQueue(String incomingQueue) {
this.incomingQueue = incomingQueue;
}
public String getOutgoingQueue() {
return outgoingQueue;
}
public void setOutgoingQueue(String outgoingQueue) {
this.outgoingQueue = outgoingQueue;
}
public String getBackupFolder() {
return backupFolder;
}
public void setBackupFolder(String backupFolder) {
this.backupFolder = backupFolder;
}
public String getAckQueue() {
return ackQueue;
}
public void setAckQueue(String ackQueue) {
this.ackQueue = ackQueue;
}
/**
* The configure method is invoked by the Camel to process the message
* exchanges from the incoming queue and place the processed transaction
xml
* to corresponding queues This method also processes the on Exception
* scenario wherein invoking the Error Handler and places the failure
* message to the Failure Queue
*/
@Override
public void configure() throws Exception {
String incomingQueue = "websphere:queue:" + getIncomingQueue();
String outgoingQueue = "websphere:queue:" + getOutgoingQueue();
String failureQueue = "websphere:queue:" + getAckQueue();
String backupFolder = "file:" + getBackupFolder();
FixParseValidator.init();
FixProcessor fixProcessor = new FixProcessor();
FIXAckProcessor fixAckProcessor = new FIXAckProcessor();
FIXErrorHandler fixErrorHandler = new FIXErrorHandler();
fixAckProcessor.setFixProcessor(fixProcessor);
fixErrorHandler.setFixProcessor(fixProcessor);
onException(Exception.class).process(fixErrorHandler);
onCompletion().process(fixAckProcessor).to(failureQueue);
from(incomingQueue).to(backupFolder).process(fixProcessor)
.to(outgoingQueue);
}
}
/
--
View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764209.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Re: Concurrent Consumers creating duplicates
Posted by Willem Jiang <wi...@gmail.com>.
I didn’t find the Route definition of wmqRoute, you may need to show us the code of
com.broadridge.adapters.fixadapter.router.WMQRouter.
BWT, why do your route need to specify the “ackQueue”?
--
Willem Jiang
Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang
Weibo: 姜宁willem
On March 16, 2015 at 2:01:59 PM, kishore (kishoredevarasetty@gmail.com) wrote:
> Here is my camel-context.xml file
>
>
> > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xmlns:context="http://www.springframework.org/schema/context"
> xmlns:camel="http://camel.apache.org/schema/spring"
> xsi:schemaLocation="
> http://www.springframework.org/schema/context
> http://www.springframework.org/schema/context/spring-context-3.0.xsd
> http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
> http://camel.apache.org/schema/spring
> http://camel.apache.org/schema/spring/camel-spring.xsd">
>
>
> > class="com.broadridge.adapters.fixadapter.router.WMQRouter">
>
>
>
>
>
>
>
> >
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> > class="com.ibm.mq.jms.MQConnectionFactory">
>
>
>
>
>
>
>
> > class="org.apache.camel.component.jms.JmsConfiguration">
>
>
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764201.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
Re: Concurrent Consumers creating duplicates
Posted by kishore <ki...@gmail.com>.
Here is my camel-context.xml file
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="wmqRoute"
class="com.broadridge.adapters.fixadapter.router.WMQRouter">
<property name="incomingQueue" value="${INPUTQUEUE}" />
<property name="outgoingQueue" value="${OUTPUTQUEUE}" />
<property name="ackQueue" value="${ACKQUEUE}" />
<property name="backupFolder" value="${BACKUPFOLDER}" />
<property name="concurrentConsumers" value="${CONCURRENTCONSUMERS}" />
</bean>
<bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location" value="classpath:config.properties">
</property>
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<routeBuilder ref="wmqRoute" />
</camelContext>
<bean id="websphere" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="websphereConfig" />
</bean>
<bean id="websphereConnectionFactory"
class="com.ibm.mq.jms.MQConnectionFactory">
<property name="transportType" value="${TRANSPORT_TYPE}" />
<property name="hostName" value="${HOSTNAME}" />
<property name="port" value="${PORT}" />
<property name="queueManager" value="${QUEUEMANAGER}" />
<property name="channel" value="${CHANNEL}" />
</bean>
<bean id="websphereConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="websphereConnectionFactory" />
</bean>
</beans>
--
View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764201.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Re: Concurrent Consumers creating duplicates
Posted by Willem Jiang <wi...@gmail.com>.
You may need to check the acknowledge module setting of your JMS connection.
BTW, can you show us the JMS endpoint setting that you have?
--
Willem Jiang
Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang
Weibo: 姜宁willem
On March 15, 2015 at 3:09:28 PM, kishore (kishoredevarasetty@gmail.com) wrote:
> If I set the concurrentConsumers to 1 ,all the input messages are processed
> fine and there are no duplicates. If I set concurrentConsumers to 5, some
> messages are lost and some messages are duplicated. Can someone please
> suggest a solution.
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764168.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
Re: Concurrent Consumers creating duplicates
Posted by kishore <ki...@gmail.com>.
If I set the concurrentConsumers to 1 ,all the input messages are processed
fine and there are no duplicates. If I set concurrentConsumers to 5, some
messages are lost and some messages are duplicated. Can someone please
suggest a solution.
--
View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764168.html
Sent from the Camel - Users mailing list archive at Nabble.com.