You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2009/10/21 14:58:03 UTC

Re: explicitly setting replyTo doesn't scale with .threads() and JMS cache.

Hi

Maybe this ticket can give some hints
https://issues.apache.org/activemq/browse/CAMEL-490

On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <er...@gmail.com> wrote:
>
> Hello.
> In order to optimize my InOut throughput I added explicit replyTo  on my JMS
> endpoint.
> This doesn't seems to be well behaving with camel threads and spring JMS
> caching.
> The following test case shows the problem.
> remove the explicit replyTo and everything works fine.
> your comments are welcome.
>
> ================
>
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.Callable;
> import javax.jms.TextMessage;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.Session;
> import javax.jms.JMSException;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.CamelContext;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.ExchangePattern;
> import org.apache.camel.test.CamelTestSupport;
> import org.springframework.jms.connection.CachingConnectionFactory;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessageCreator;
> import static
> org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
>
> /**
>  * Unit test using a fixed replyTo specified on the JMS endpoint
>  *
>  * @version $Revision: 791824 $
>  */
> public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport {
>
>    private ActiveMQComponent amq;
>    private static String MQURI = "failover:(tcp://localhost:61616)";
>   // private static String MQURI =
> "vm://localhost?broker.persistent=false&broker.useJmx=false";
>
>    public class Replier implements Callable {
>
>        @Override
>                public Object call() throws Exception {
>                log.info("replier started");
>                JmsTemplate jms = new
> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>
>                final TextMessage msg = (TextMessage)
> jms.receive("nameRequestor");
>                assertEquals("What's your name", msg.getText());
>
>                // there should be a JMSReplyTo so we know where to send the
> reply
>                final Destination replyTo = msg.getJMSReplyTo();
>
>                // send reply
>               // Thread.sleep(10000);
>                jms.send(replyTo, new MessageCreator() {
>                    public Message createMessage(Session session) throws
> JMSException {
>                        TextMessage replyMsg = session.createTextMessage();
>                        replyMsg.setText("My name is Arnio");
>
> replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
>                        return replyMsg;
>                    }
>                });
>                return null;
>            }
>    };
>    public void testCustomJMSReplyToInOut() throws Exception {
>
>        MockEndpoint mock = getMockEndpoint("mock:result");
>        mock.expectedBodiesReceived("My name is Arnio");
>        mock.expectedMessageCount(10);
>        // do not use Camel to send and receive to simulate a non Camel
> client
>        // use another thread to listen and send the reply
>        ExecutorService newFixedThreadPool =
> Executors.newFixedThreadPool(20);
>
>        for (int i = 0 ; i < 10 ; i++) {
>          newFixedThreadPool.submit(new Replier());
>        }
>
>        // now get started and send the first message that gets the ball
> rolling
>        JmsTemplate jms = new
> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>        for (int i = 0 ; i < 10 ; i++) {
>        jms.send("hello", new MessageCreator() {
>            public Message createMessage(Session session) throws
> JMSException {
>                TextMessage msg = session.createTextMessage();
>                msg.setText("Hello, I'm here");
>                return msg;
>            }
>
>        });
>
>        log.info("Hello sent");
>        Thread.sleep(10);
>        }
>
>        Thread.sleep(5000);
>        assertMockEndpointsSatisfied();
>    }
>
>    protected RouteBuilder createRouteBuilder() throws Exception {
>        return new RouteBuilder() {
>
>            public void configure() throws Exception {
>                from("activemq:queue:hello")
>                   .threads()
>                    .process(new Processor() {
>                        public void process(Exchange exchange) throws
> Exception {
>
>                            exchange.getOut().setBody("What's your name");
>                        }
>                    })
>                    // use in out to get a reply as well
>
>                    .to(ExchangePattern.InOut,
> "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the
> replyTo and eveything works well
>                    .to("direct:replyprocessor");
>
>                    from("direct:replyprocessor").process(new Processor() {
>                            public void process(Exchange exchange) throws
> Exception {
>                                System.out.println("Here I am processing the
> reply " + exchange.getIn().getBody());
>                            }
>                    }).to("mock:result");
>            }
>        };
>    }
>
>    protected CamelContext createCamelContext() throws Exception {
>         CamelContext camelContext = super.createCamelContext();
>        amq = activeMQComponent(MQURI);
>        ActiveMQConnectionFactory targetFactory = new
> ActiveMQConnectionFactory(MQURI);
>         log.info("Using MQ CachingConnectionFactory");
>            CachingConnectionFactory cachedAMQConnectionFactory = new
> CachingConnectionFactory(targetFactory);
>
>        camelContext.addComponent("activemq",
> ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory));
>
>        return camelContext;
>    }
>
>
> }
>
> ===========
> --
> View this message in context: http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html
> Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: explicitly setting replyTo doesn't scale with .threads() and JMS cache.

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Also use option concurrentConsumers on the JMS endpoint to support
concurrency, instead of threads.


On Wed, Oct 21, 2009 at 2:58 PM, Claus Ibsen <cl...@gmail.com> wrote:
> Hi
>
> Maybe this ticket can give some hints
> https://issues.apache.org/activemq/browse/CAMEL-490
>
> On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <er...@gmail.com> wrote:
>>
>> Hello.
>> In order to optimize my InOut throughput I added explicit replyTo  on my JMS
>> endpoint.
>> This doesn't seems to be well behaving with camel threads and spring JMS
>> caching.
>> The following test case shows the problem.
>> remove the explicit replyTo and everything works fine.
>> your comments are welcome.
>>
>> ================
>>
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.Executors;
>> import java.util.concurrent.Callable;
>> import javax.jms.TextMessage;
>> import javax.jms.Destination;
>> import javax.jms.Message;
>> import javax.jms.Session;
>> import javax.jms.JMSException;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.component.mock.MockEndpoint;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> import org.apache.camel.ExchangePattern;
>> import org.apache.camel.test.CamelTestSupport;
>> import org.springframework.jms.connection.CachingConnectionFactory;
>> import org.springframework.jms.core.JmsTemplate;
>> import org.springframework.jms.core.MessageCreator;
>> import static
>> org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
>>
>> /**
>>  * Unit test using a fixed replyTo specified on the JMS endpoint
>>  *
>>  * @version $Revision: 791824 $
>>  */
>> public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport {
>>
>>    private ActiveMQComponent amq;
>>    private static String MQURI = "failover:(tcp://localhost:61616)";
>>   // private static String MQURI =
>> "vm://localhost?broker.persistent=false&broker.useJmx=false";
>>
>>    public class Replier implements Callable {
>>
>>        @Override
>>                public Object call() throws Exception {
>>                log.info("replier started");
>>                JmsTemplate jms = new
>> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>>
>>                final TextMessage msg = (TextMessage)
>> jms.receive("nameRequestor");
>>                assertEquals("What's your name", msg.getText());
>>
>>                // there should be a JMSReplyTo so we know where to send the
>> reply
>>                final Destination replyTo = msg.getJMSReplyTo();
>>
>>                // send reply
>>               // Thread.sleep(10000);
>>                jms.send(replyTo, new MessageCreator() {
>>                    public Message createMessage(Session session) throws
>> JMSException {
>>                        TextMessage replyMsg = session.createTextMessage();
>>                        replyMsg.setText("My name is Arnio");
>>
>> replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
>>                        return replyMsg;
>>                    }
>>                });
>>                return null;
>>            }
>>    };
>>    public void testCustomJMSReplyToInOut() throws Exception {
>>
>>        MockEndpoint mock = getMockEndpoint("mock:result");
>>        mock.expectedBodiesReceived("My name is Arnio");
>>        mock.expectedMessageCount(10);
>>        // do not use Camel to send and receive to simulate a non Camel
>> client
>>        // use another thread to listen and send the reply
>>        ExecutorService newFixedThreadPool =
>> Executors.newFixedThreadPool(20);
>>
>>        for (int i = 0 ; i < 10 ; i++) {
>>          newFixedThreadPool.submit(new Replier());
>>        }
>>
>>        // now get started and send the first message that gets the ball
>> rolling
>>        JmsTemplate jms = new
>> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>>        for (int i = 0 ; i < 10 ; i++) {
>>        jms.send("hello", new MessageCreator() {
>>            public Message createMessage(Session session) throws
>> JMSException {
>>                TextMessage msg = session.createTextMessage();
>>                msg.setText("Hello, I'm here");
>>                return msg;
>>            }
>>
>>        });
>>
>>        log.info("Hello sent");
>>        Thread.sleep(10);
>>        }
>>
>>        Thread.sleep(5000);
>>        assertMockEndpointsSatisfied();
>>    }
>>
>>    protected RouteBuilder createRouteBuilder() throws Exception {
>>        return new RouteBuilder() {
>>
>>            public void configure() throws Exception {
>>                from("activemq:queue:hello")
>>                   .threads()
>>                    .process(new Processor() {
>>                        public void process(Exchange exchange) throws
>> Exception {
>>
>>                            exchange.getOut().setBody("What's your name");
>>                        }
>>                    })
>>                    // use in out to get a reply as well
>>
>>                    .to(ExchangePattern.InOut,
>> "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the
>> replyTo and eveything works well
>>                    .to("direct:replyprocessor");
>>
>>                    from("direct:replyprocessor").process(new Processor() {
>>                            public void process(Exchange exchange) throws
>> Exception {
>>                                System.out.println("Here I am processing the
>> reply " + exchange.getIn().getBody());
>>                            }
>>                    }).to("mock:result");
>>            }
>>        };
>>    }
>>
>>    protected CamelContext createCamelContext() throws Exception {
>>         CamelContext camelContext = super.createCamelContext();
>>        amq = activeMQComponent(MQURI);
>>        ActiveMQConnectionFactory targetFactory = new
>> ActiveMQConnectionFactory(MQURI);
>>         log.info("Using MQ CachingConnectionFactory");
>>            CachingConnectionFactory cachedAMQConnectionFactory = new
>> CachingConnectionFactory(targetFactory);
>>
>>        camelContext.addComponent("activemq",
>> ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory));
>>
>>        return camelContext;
>>    }
>>
>>
>> }
>>
>> ===========
>> --
>> View this message in context: http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html
>> Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.
>>
>>
>
>
>
> --
> Claus Ibsen
> Apache Camel Committer
>
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus