You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2009/10/21 14:58:03 UTC
Re: explicitly setting replyTo doesn't scale with .threads() and JMS
cache.
Hi
Maybe this ticket can give some hints
https://issues.apache.org/activemq/browse/CAMEL-490
On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <er...@gmail.com> wrote:
>
> Hello.
> In order to optimize my InOut throughput I added explicit replyTo on my JMS
> endpoint.
> This doesn't seems to be well behaving with camel threads and spring JMS
> caching.
> The following test case shows the problem.
> remove the explicit replyTo and everything works fine.
> your comments are welcome.
>
> ================
>
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.Callable;
> import javax.jms.TextMessage;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.Session;
> import javax.jms.JMSException;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.CamelContext;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.ExchangePattern;
> import org.apache.camel.test.CamelTestSupport;
> import org.springframework.jms.connection.CachingConnectionFactory;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessageCreator;
> import static
> org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
>
> /**
> * Unit test using a fixed replyTo specified on the JMS endpoint
> *
> * @version $Revision: 791824 $
> */
> public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport {
>
> private ActiveMQComponent amq;
> private static String MQURI = "failover:(tcp://localhost:61616)";
> // private static String MQURI =
> "vm://localhost?broker.persistent=false&broker.useJmx=false";
>
> public class Replier implements Callable {
>
> @Override
> public Object call() throws Exception {
> log.info("replier started");
> JmsTemplate jms = new
> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>
> final TextMessage msg = (TextMessage)
> jms.receive("nameRequestor");
> assertEquals("What's your name", msg.getText());
>
> // there should be a JMSReplyTo so we know where to send the
> reply
> final Destination replyTo = msg.getJMSReplyTo();
>
> // send reply
> // Thread.sleep(10000);
> jms.send(replyTo, new MessageCreator() {
> public Message createMessage(Session session) throws
> JMSException {
> TextMessage replyMsg = session.createTextMessage();
> replyMsg.setText("My name is Arnio");
>
> replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
> return replyMsg;
> }
> });
> return null;
> }
> };
> public void testCustomJMSReplyToInOut() throws Exception {
>
> MockEndpoint mock = getMockEndpoint("mock:result");
> mock.expectedBodiesReceived("My name is Arnio");
> mock.expectedMessageCount(10);
> // do not use Camel to send and receive to simulate a non Camel
> client
> // use another thread to listen and send the reply
> ExecutorService newFixedThreadPool =
> Executors.newFixedThreadPool(20);
>
> for (int i = 0 ; i < 10 ; i++) {
> newFixedThreadPool.submit(new Replier());
> }
>
> // now get started and send the first message that gets the ball
> rolling
> JmsTemplate jms = new
> JmsTemplate(amq.getConfiguration().getConnectionFactory());
> for (int i = 0 ; i < 10 ; i++) {
> jms.send("hello", new MessageCreator() {
> public Message createMessage(Session session) throws
> JMSException {
> TextMessage msg = session.createTextMessage();
> msg.setText("Hello, I'm here");
> return msg;
> }
>
> });
>
> log.info("Hello sent");
> Thread.sleep(10);
> }
>
> Thread.sleep(5000);
> assertMockEndpointsSatisfied();
> }
>
> protected RouteBuilder createRouteBuilder() throws Exception {
> return new RouteBuilder() {
>
> public void configure() throws Exception {
> from("activemq:queue:hello")
> .threads()
> .process(new Processor() {
> public void process(Exchange exchange) throws
> Exception {
>
> exchange.getOut().setBody("What's your name");
> }
> })
> // use in out to get a reply as well
>
> .to(ExchangePattern.InOut,
> "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the
> replyTo and eveything works well
> .to("direct:replyprocessor");
>
> from("direct:replyprocessor").process(new Processor() {
> public void process(Exchange exchange) throws
> Exception {
> System.out.println("Here I am processing the
> reply " + exchange.getIn().getBody());
> }
> }).to("mock:result");
> }
> };
> }
>
> protected CamelContext createCamelContext() throws Exception {
> CamelContext camelContext = super.createCamelContext();
> amq = activeMQComponent(MQURI);
> ActiveMQConnectionFactory targetFactory = new
> ActiveMQConnectionFactory(MQURI);
> log.info("Using MQ CachingConnectionFactory");
> CachingConnectionFactory cachedAMQConnectionFactory = new
> CachingConnectionFactory(targetFactory);
>
> camelContext.addComponent("activemq",
> ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory));
>
> return camelContext;
> }
>
>
> }
>
> ===========
> --
> View this message in context: http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html
> Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.
>
>
--
Claus Ibsen
Apache Camel Committer
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus
Re: explicitly setting replyTo doesn't scale with .threads() and JMS
cache.
Posted by Claus Ibsen <cl...@gmail.com>.
Hi
Also use option concurrentConsumers on the JMS endpoint to support
concurrency, instead of threads.
On Wed, Oct 21, 2009 at 2:58 PM, Claus Ibsen <cl...@gmail.com> wrote:
> Hi
>
> Maybe this ticket can give some hints
> https://issues.apache.org/activemq/browse/CAMEL-490
>
> On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <er...@gmail.com> wrote:
>>
>> Hello.
>> In order to optimize my InOut throughput I added explicit replyTo on my JMS
>> endpoint.
>> This doesn't seems to be well behaving with camel threads and spring JMS
>> caching.
>> The following test case shows the problem.
>> remove the explicit replyTo and everything works fine.
>> your comments are welcome.
>>
>> ================
>>
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.Executors;
>> import java.util.concurrent.Callable;
>> import javax.jms.TextMessage;
>> import javax.jms.Destination;
>> import javax.jms.Message;
>> import javax.jms.Session;
>> import javax.jms.JMSException;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.component.mock.MockEndpoint;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> import org.apache.camel.ExchangePattern;
>> import org.apache.camel.test.CamelTestSupport;
>> import org.springframework.jms.connection.CachingConnectionFactory;
>> import org.springframework.jms.core.JmsTemplate;
>> import org.springframework.jms.core.MessageCreator;
>> import static
>> org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
>>
>> /**
>> * Unit test using a fixed replyTo specified on the JMS endpoint
>> *
>> * @version $Revision: 791824 $
>> */
>> public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport {
>>
>> private ActiveMQComponent amq;
>> private static String MQURI = "failover:(tcp://localhost:61616)";
>> // private static String MQURI =
>> "vm://localhost?broker.persistent=false&broker.useJmx=false";
>>
>> public class Replier implements Callable {
>>
>> @Override
>> public Object call() throws Exception {
>> log.info("replier started");
>> JmsTemplate jms = new
>> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>>
>> final TextMessage msg = (TextMessage)
>> jms.receive("nameRequestor");
>> assertEquals("What's your name", msg.getText());
>>
>> // there should be a JMSReplyTo so we know where to send the
>> reply
>> final Destination replyTo = msg.getJMSReplyTo();
>>
>> // send reply
>> // Thread.sleep(10000);
>> jms.send(replyTo, new MessageCreator() {
>> public Message createMessage(Session session) throws
>> JMSException {
>> TextMessage replyMsg = session.createTextMessage();
>> replyMsg.setText("My name is Arnio");
>>
>> replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
>> return replyMsg;
>> }
>> });
>> return null;
>> }
>> };
>> public void testCustomJMSReplyToInOut() throws Exception {
>>
>> MockEndpoint mock = getMockEndpoint("mock:result");
>> mock.expectedBodiesReceived("My name is Arnio");
>> mock.expectedMessageCount(10);
>> // do not use Camel to send and receive to simulate a non Camel
>> client
>> // use another thread to listen and send the reply
>> ExecutorService newFixedThreadPool =
>> Executors.newFixedThreadPool(20);
>>
>> for (int i = 0 ; i < 10 ; i++) {
>> newFixedThreadPool.submit(new Replier());
>> }
>>
>> // now get started and send the first message that gets the ball
>> rolling
>> JmsTemplate jms = new
>> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>> for (int i = 0 ; i < 10 ; i++) {
>> jms.send("hello", new MessageCreator() {
>> public Message createMessage(Session session) throws
>> JMSException {
>> TextMessage msg = session.createTextMessage();
>> msg.setText("Hello, I'm here");
>> return msg;
>> }
>>
>> });
>>
>> log.info("Hello sent");
>> Thread.sleep(10);
>> }
>>
>> Thread.sleep(5000);
>> assertMockEndpointsSatisfied();
>> }
>>
>> protected RouteBuilder createRouteBuilder() throws Exception {
>> return new RouteBuilder() {
>>
>> public void configure() throws Exception {
>> from("activemq:queue:hello")
>> .threads()
>> .process(new Processor() {
>> public void process(Exchange exchange) throws
>> Exception {
>>
>> exchange.getOut().setBody("What's your name");
>> }
>> })
>> // use in out to get a reply as well
>>
>> .to(ExchangePattern.InOut,
>> "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the
>> replyTo and eveything works well
>> .to("direct:replyprocessor");
>>
>> from("direct:replyprocessor").process(new Processor() {
>> public void process(Exchange exchange) throws
>> Exception {
>> System.out.println("Here I am processing the
>> reply " + exchange.getIn().getBody());
>> }
>> }).to("mock:result");
>> }
>> };
>> }
>>
>> protected CamelContext createCamelContext() throws Exception {
>> CamelContext camelContext = super.createCamelContext();
>> amq = activeMQComponent(MQURI);
>> ActiveMQConnectionFactory targetFactory = new
>> ActiveMQConnectionFactory(MQURI);
>> log.info("Using MQ CachingConnectionFactory");
>> CachingConnectionFactory cachedAMQConnectionFactory = new
>> CachingConnectionFactory(targetFactory);
>>
>> camelContext.addComponent("activemq",
>> ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory));
>>
>> return camelContext;
>> }
>>
>>
>> }
>>
>> ===========
>> --
>> View this message in context: http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html
>> Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.
>>
>>
>
>
>
> --
> Claus Ibsen
> Apache Camel Committer
>
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
>
--
Claus Ibsen
Apache Camel Committer
Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus