You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Jim Newsham <jn...@referentia.com> on 2011/07/08 03:18:56 UTC

slow reply for jms component when url contains replyTo

I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I 
specify a custom replyTo destination on the endpoint url, the time it 
takes for the producer to receive a reply increases drastically.  The 
curious thing is that the time to receive a reply is almost exactly 1 
second.  When I remove the replyTo from the url, everything's fast again.

I created a very simple, stand-alone test to demonstrate what I'm 
seeing.  There is a server class [4] which runs an embedded instance of 
ActiveMQ and simply replies to messages as they arrive; and a client [3] 
class which simply sends messages to the server, and prints the elapsed 
time.  The USE_REPLY_TO symbolic constant in the client determines 
whether a replyTo value is added to the url or not.

The client output when USE_REPLY_TO is false is shown as [1].  The 
client output when USE_REPLY_TO is true is shown as [2].  The code is 
pretty trivial.  Am I doing something wrong, or is this a Camel and/or 
ActiveMQ issue?

Thanks!
Jim


[1] USE_REPLY_TO = false

received reply in: 0.476 s
received reply in: 0.006 s
received reply in: 0.006 s
received reply in: 0.006 s
received reply in: 0.006 s
...


[2] USE_REPLY_TO = true

received reply in: 1.524 s
received reply in: 1.002 s
received reply in: 1.003 s
received reply in: 1.003 s
received reply in: 1.002 s
...


[3] TestReplyToClient.java

package test;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;

public class TestReplyToClient {

   private static final boolean USE_REPLY_TO = false;

   public static void main(String... args) throws Exception {
     // create camel context; configure activemq component for 
tcp://localhost:7001
     CamelContext context = new DefaultCamelContext();
     ActiveMQComponent activemqComponent = 
ActiveMQComponent.activeMQComponent();
     activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
       null, null, "tcp://localhost:7001"));
     context.addComponent("activemq", activemqComponent);
     context.start();

     // define url to send requests to
     String sendUrl = "activemq:queue:dest";
     if (USE_REPLY_TO) {
       sendUrl += "?replyTo=replyQueue";
     }
     System.err.println("sending to url: " + sendUrl);

     // repeatedly send requests; measure elapsed time
     ProducerTemplate template = context.createProducerTemplate();
     while (true) {
       long startNanos = System.nanoTime();
       template.requestBody(sendUrl, "abc");
       long elapsedNanos = System.nanoTime() - startNanos;
       System.err.println(String.format("received reply in: %.3f s", 
elapsedNanos / 1000000000.0));
     }
   }

}


[4] TestReplyToServer.java

package test;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class TestReplyToServer {

   private static final String BROKER_NAME = "thebroker";

   public static void main(String... args) throws Exception {
     startBroker();
     startCamel();
     Thread.sleep(Long.MAX_VALUE);
   }

   private static void startBroker() throws Exception {
     BrokerService brokerService = new BrokerService();
     brokerService.setBrokerName(BROKER_NAME);
     brokerService.setSchedulerSupport(false);
     brokerService.setPersistent(false);
     brokerService.addConnector("tcp://0.0.0.0:7001");
     brokerService.start();
     brokerService.waitUntilStarted();
   }


   private static void startCamel() throws Exception {
     CamelContext context = new DefaultCamelContext();

     ActiveMQComponent activemqComponent = 
ActiveMQComponent.activeMQComponent();
     
activemqComponent.setBrokerURL(String.format("vm://%s?create=false", 
BROKER_NAME));
     context.addComponent("activemq", activemqComponent);

     final String receiveUrl = "activemq:queue:dest";
     context.addRoutes(new RouteBuilder() {
       @Override
       public void configure() throws Exception {
         from(receiveUrl).process(new Processor() {
           @Override
           public void process(Exchange exchange) throws Exception {
             System.err.println("received request");
             exchange.getOut().setBody("reply");
           }
         });
       }
     });

     context.start();
     System.err.println("listening on url: " + receiveUrl);
   }

}





Re: slow reply for jms component when url contains replyTo

Posted by Christian Müller <ch...@gmail.com>.
Hello Claus, hello Jim!

We also using fixed persistent reply to queue which are *exclusive* to
Camel. We have to do this, because we must not loose messages which is
potentially the case for temp queues, as I understood.

Best,
Christian

On Sat, Jul 9, 2011 at 5:52 PM, Claus Ibsen <cl...@gmail.com> wrote:

> Hi Jim
>
> I have created a ticket and posted some comments about the issue
> https://issues.apache.org/jira/browse/CAMEL-4202
>
> Are you using a fixed reply to queue that is *exclusive* to the Camel
> route?
> Or is the queue used for other purposes as well?
>
> Is there a special reason why you want to use a fixed reply to queue?
>
>
>
> On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham <jn...@referentia.com>
> wrote:
> >
> > Hi Claus,
> >
> > I enabled trace logging.  I'm attaching the logs (for both client and
> > server; both with and without custom replyTo) as a zip file -- not sure
> if
> > the mailing list will filter it, we'll see.
> >
> > I see that there are 5 messages in the client log which only appear when
> a
> > custom replyTo is specified:  "Running purge task to see if any entries
> has
> > been timed out", "There are 1 in the timeout map", "did not receive a
> > message", etc.  Here's an excerpt from each client log, for one exchange:
> >
> > From log for client without replyTo:
> >
> > 2011-07-08 10:55:32,354 [main] TRACE
> > org.apache.camel.component.jms.JmsProducer - Using inOut jms template
> > 2011-07-08 10:55:32,361 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> Executing
> > callback on JMS Session: ActiveMQSession
> > {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
> > 2011-07-08 10:55:32,361 [main] TRACE
> > org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
> > 2011-07-08 10:55:32,362 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> Sending
> > JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
> =
> > 0, responseRequired = false, messageId = null, originalDestination =
> null,
> > originalTransactionId = null, producerId = null, destination = null,
> > transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
> > brokerInTime = 0, brokerOutTime = 0, correlationId =
> > ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
> > temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
> > false, type = null, priority = 0, groupID = null, groupSequence = 0,
> > targetConsumerId = null, compressed = false, userID = null, content =
> null,
> > marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> > size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
> > false, droppable = false, text = abc}
> > 2011-07-08 10:55:32,363 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
> JMS
> > message to: queue://dest with message: ActiveMQTextMessage {commandId =
> 0,
> > responseRequired = false, messageId =
> > ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination =
> > null, originalTransactionId = null, producerId = null, destination =
> > queue://dest, transactionId = null, expiration = 1310158552362, timestamp
> =
> > 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
> > correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
> > temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
> > true, type = null, priority = 4, groupID = null, groupSequence = 0,
> > targetConsumerId = null, compressed = false, userID = null, content =
> null,
> > marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> > size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
> > false, droppable = false, text = abc}
> > 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
> > org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
> Received
> > reply message with correlationID:
> ID-rsi-eng-newsham-61472-1310158530715-0-4
> > -> ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId
> =
> > ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination =
> > null, originalTransactionId = null, producerId =
> > ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
> > temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
> =
> > null, expiration = 0, timestamp = 1310158532367, arrival = 0,
> brokerInTime =
> > 1310158532367, brokerOutTime = 1310158532367, correlationId =
> > ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
> > true, type = null, priority = 4, groupID = null, groupSequence = 0,
> > targetConsumerId = null, compressed = false, userID = null, content =
> null,
> > marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> > size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
> true,
> > droppable = false, text = reply}
> > 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
> > org.apache.camel.component.jms.JmsBinding - Extracting body as a
> TextMessage
> > from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
> > true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
> > originalDestination = null, originalTransactionId = null, producerId =
> > ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
> > temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
> =
> > null, expiration = 0, timestamp = 1310158532367, arrival = 0,
> brokerInTime =
> > 1310158532367, brokerOutTime = 1310158532367, correlationId =
> > ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
> > true, type = null, priority = 4, groupID = null, groupSequence = 0,
> > targetConsumerId = null, compressed = false, userID = null, content =
> null,
> > marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> > size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
> true,
> > droppable = false, text = reply}
> > 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
> > org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply
> > received. Setting reply as OUT message: reply
> > received reply in: 0.015 s
> >
> > From log for client with replyTo:
> >
> > 2011-07-08 10:52:10,075 [main] TRACE
> > org.apache.camel.component.jms.JmsProducer - Using inOut jms template
> > 2011-07-08 10:52:10,081 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> Executing
> > callback on JMS Session: ActiveMQSession
> > {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
> > 2011-07-08 10:52:10,082 [main] TRACE
> > org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
> > 2011-07-08 10:52:10,082 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> Sending
> > JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
> =
> > 0, responseRequired = false, messageId = null, originalDestination =
> null,
> > originalTransactionId = null, producerId = null, destination = null,
> > transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
> > brokerInTime = 0, brokerOutTime = 0, correlationId =
> > ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = queue://replyQueue,
> > persistent = false, type = null, priority = 0, groupID = null,
> groupSequence
> > = 0, targetConsumerId = null, compressed = false, userID = null, content
> =
> > null, marshalledProperties = null, dataStructure = null,
> redeliveryCounter =
> > 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody
> =
> > false, droppable = false, text = abc}
> > 2011-07-08 10:52:10,083 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
> JMS
> > message to: queue://dest with message: ActiveMQTextMessage {commandId =
> 0,
> > responseRequired = false, messageId =
> > ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination =
> > null, originalTransactionId = null, producerId = null, destination =
> > queue://dest, transactionId = null, expiration = 1310158350082, timestamp
> =
> > 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
> > correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
> > queue://replyQueue, persistent = true, type = null, priority = 4, groupID
> =
> > null, groupSequence = 0, targetConsumerId = null, compressed = false,
> userID
> > = null, content = null, marshalledProperties = null, dataStructure =
> null,
> > redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
> > false, readOnlyBody = false, droppable = false, text = abc}
> > 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
> > JmsReplyManagerTimeoutChecker[dest] TRACE
> > org.apache.camel.component.jms.reply.CorrelationMap - Running purge task
> to
> > see if any entries has been timed out
> > 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
> > JmsReplyManagerTimeoutChecker[dest] TRACE
> > org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in the
> > timeout map
> > 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG
> >
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
> > - Consumer [ActiveMQMessageConsumer {
> > value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }] of
> > session [ActiveMQSession
> > {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not
> > receive a message
> > 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
> > org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Using
> >
> MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
> > 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> >
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
> > - Received message of type [class
> > org.apache.activemq.command.ActiveMQTextMessage] from consumer
> > [ActiveMQMessageConsumer {
> > value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }] of
> > session [ActiveMQSession
> > {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
> > 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> > org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
> Received
> > reply message with correlationID:
> ID-rsi-eng-newsham-61454-1310158327407-0-4
> > -> ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId
> =
> > ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination =
> > null, originalTransactionId = null, producerId =
> > ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
> > queue://replyQueue, transactionId = null, expiration = 0, timestamp =
> > 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
> > 1310158331077, correlationId =
> ID-rsi-eng-newsham-61454-1310158327407-0-4,
> > replyTo = null, persistent = true, type = null, priority = 4, groupID =
> > null, groupSequence = 0, targetConsumerId = null, compressed = false,
> userID
> > = null, content = null, marshalledProperties = null, dataStructure =
> null,
> > redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
> > true, readOnlyBody = true, droppable = false, text = reply}
> > 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
> > org.apache.camel.component.jms.JmsBinding - Extracting body as a
> TextMessage
> > from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
> > true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
> > originalDestination = null, originalTransactionId = null, producerId =
> > ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
> > queue://replyQueue, transactionId = null, expiration = 0, timestamp =
> > 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
> > 1310158331077, correlationId =
> ID-rsi-eng-newsham-61454-1310158327407-0-4,
> > replyTo = null, persistent = true, type = null, priority = 4, groupID =
> > null, groupSequence = 0, targetConsumerId = null, compressed = false,
> userID
> > = null, content = null, marshalledProperties = null, dataStructure =
> null,
> > redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
> > true, readOnlyBody = true, droppable = false, text = reply}
> > 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> > org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Reply
> > received. Setting reply as OUT message: reply
> > received reply in: 1.004 s
> >
> > Thanks,
> > Jim
> >
> >
> >
> > On 7/7/2011 10:59 PM, Claus Ibsen wrote:
> >>
> >> Can you enable TRACE logging at org.apache.camel.component.jms and run
> >> it for both examples.
> >> To see what happens.
> >>
> >>
> >>
> >> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jn...@referentia.com>
> >>  wrote:
> >>>
> >>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when
> I
> >>> specify a custom replyTo destination on the endpoint url, the time it
> >>> takes
> >>> for the producer to receive a reply increases drastically.  The curious
> >>> thing is that the time to receive a reply is almost exactly 1 second.
> >>>  When
> >>> I remove the replyTo from the url, everything's fast again.
> >>>
> >>> I created a very simple, stand-alone test to demonstrate what I'm
> seeing.
> >>>  There is a server class [4] which runs an embedded instance of
> ActiveMQ
> >>> and
> >>> simply replies to messages as they arrive; and a client [3] class which
> >>> simply sends messages to the server, and prints the elapsed time.  The
> >>> USE_REPLY_TO symbolic constant in the client determines whether a
> replyTo
> >>> value is added to the url or not.
> >>>
> >>> The client output when USE_REPLY_TO is false is shown as [1].  The
> client
> >>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
> >>> trivial.  Am I doing something wrong, or is this a Camel and/or
> ActiveMQ
> >>> issue?
> >>>
> >>> Thanks!
> >>> Jim
> >>>
> >>>
> >>> [1] USE_REPLY_TO = false
> >>>
> >>> received reply in: 0.476 s
> >>> received reply in: 0.006 s
> >>> received reply in: 0.006 s
> >>> received reply in: 0.006 s
> >>> received reply in: 0.006 s
> >>> ...
> >>>
> >>>
> >>> [2] USE_REPLY_TO = true
> >>>
> >>> received reply in: 1.524 s
> >>> received reply in: 1.002 s
> >>> received reply in: 1.003 s
> >>> received reply in: 1.003 s
> >>> received reply in: 1.002 s
> >>> ...
> >>>
> >>>
> >>> [3] TestReplyToClient.java
> >>>
> >>> package test;
> >>>
> >>> import org.apache.activemq.ActiveMQConnectionFactory;
> >>> import org.apache.activemq.camel.component.ActiveMQComponent;
> >>> import org.apache.camel.CamelContext;
> >>> import org.apache.camel.ProducerTemplate;
> >>> import org.apache.camel.impl.DefaultCamelContext;
> >>>
> >>> public class TestReplyToClient {
> >>>
> >>>  private static final boolean USE_REPLY_TO = false;
> >>>
> >>>  public static void main(String... args) throws Exception {
> >>>    // create camel context; configure activemq component for
> >>> tcp://localhost:7001
> >>>    CamelContext context = new DefaultCamelContext();
> >>>    ActiveMQComponent activemqComponent =
> >>> ActiveMQComponent.activeMQComponent();
> >>>    activemqComponent.setConnectionFactory(new
> ActiveMQConnectionFactory(
> >>>      null, null, "tcp://localhost:7001"));
> >>>    context.addComponent("activemq", activemqComponent);
> >>>    context.start();
> >>>
> >>>    // define url to send requests to
> >>>    String sendUrl = "activemq:queue:dest";
> >>>    if (USE_REPLY_TO) {
> >>>      sendUrl += "?replyTo=replyQueue";
> >>>    }
> >>>    System.err.println("sending to url: " + sendUrl);
> >>>
> >>>    // repeatedly send requests; measure elapsed time
> >>>    ProducerTemplate template = context.createProducerTemplate();
> >>>    while (true) {
> >>>      long startNanos = System.nanoTime();
> >>>      template.requestBody(sendUrl, "abc");
> >>>      long elapsedNanos = System.nanoTime() - startNanos;
> >>>      System.err.println(String.format("received reply in: %.3f s",
> >>> elapsedNanos / 1000000000.0));
> >>>    }
> >>>  }
> >>>
> >>> }
> >>>
> >>>
> >>> [4] TestReplyToServer.java
> >>>
> >>> package test;
> >>>
> >>> import org.apache.activemq.broker.BrokerService;
> >>> import org.apache.activemq.camel.component.ActiveMQComponent;
> >>> import org.apache.camel.CamelContext;
> >>> import org.apache.camel.Exchange;
> >>> import org.apache.camel.Processor;
> >>> import org.apache.camel.builder.RouteBuilder;
> >>> import org.apache.camel.impl.DefaultCamelContext;
> >>>
> >>> public class TestReplyToServer {
> >>>
> >>>  private static final String BROKER_NAME = "thebroker";
> >>>
> >>>  public static void main(String... args) throws Exception {
> >>>    startBroker();
> >>>    startCamel();
> >>>    Thread.sleep(Long.MAX_VALUE);
> >>>  }
> >>>
> >>>  private static void startBroker() throws Exception {
> >>>    BrokerService brokerService = new BrokerService();
> >>>    brokerService.setBrokerName(BROKER_NAME);
> >>>    brokerService.setSchedulerSupport(false);
> >>>    brokerService.setPersistent(false);
> >>>    brokerService.addConnector("tcp://0.0.0.0:7001");
> >>>    brokerService.start();
> >>>    brokerService.waitUntilStarted();
> >>>  }
> >>>
> >>>
> >>>  private static void startCamel() throws Exception {
> >>>    CamelContext context = new DefaultCamelContext();
> >>>
> >>>    ActiveMQComponent activemqComponent =
> >>> ActiveMQComponent.activeMQComponent();
> >>>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
> >>> BROKER_NAME));
> >>>    context.addComponent("activemq", activemqComponent);
> >>>
> >>>    final String receiveUrl = "activemq:queue:dest";
> >>>    context.addRoutes(new RouteBuilder() {
> >>>      @Override
> >>>      public void configure() throws Exception {
> >>>        from(receiveUrl).process(new Processor() {
> >>>          @Override
> >>>          public void process(Exchange exchange) throws Exception {
> >>>            System.err.println("received request");
> >>>            exchange.getOut().setBody("reply");
> >>>          }
> >>>        });
> >>>      }
> >>>    });
> >>>
> >>>    context.start();
> >>>    System.err.println("listening on url: " + receiveUrl);
> >>>  }
> >>>
> >>> }
> >>>
> >>>
> >>>
> >>>
> >>>
> >>
> >>
> >
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>

Re: slow reply for jms component when url contains replyTo

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

On Tue, Jul 12, 2011 at 10:51 PM, Jim Newsham <jn...@referentia.com> wrote:
> On 7/11/2011 7:25 PM, Claus Ibsen wrote:
>>
>> On Mon, Jul 11, 2011 at 10:12 PM, Jim Newsham<jn...@referentia.com>
>>  wrote:
>>>
>>> Hi Claus,
>>>
>>> Thanks for opening the jira issue, and for your comments.  To answer your
>>> questions:
>>>
>>> 1.  We use fixed reply-to queues which are exclusive to Camel.
>>> 2.  We need a fixed reply-to queue to avoid losing reply messages in case
>>> of
>>> disconnection (which would happen with temporary queues because they are
>>> scoped to the lifetime of the connection).
>>>
>> I think we should add an option to the JMS component so you can
>> configure that the replyTo is to be consider as exclusive.
>>
>> ?replyTo=bar&exclusiveReplyTo=true
>>
>> Something like this. Anyone got a better idea for a name of the option?
>
> That would be fine for me.  :)  I don't have any better ideas regarding the
> parameter name.
>

Okay I guess exclusiveReplyTo is a good name.
We dont want to go down with an enum in case we get a 3rd variation in
the future?
A boolean is simple to understand, so lets pick that.

>
>> Likewise I have pondered about if we should add an option to have
>> convention over configuration? So for example you can configure on the
>> jms component, a pattern of the reply to names.
>>
>> jmsComponent.setExclusiveReplyTo(true);
>> jmsComponent.setReplyToPattern("${name}.reply");
>>
>> So in this example Camel will automatic name the reply to queues, as
>> the request queue name + ".reply". So in the example above we can do
>>
>> from X
>> to("jms:queue:bar")
>> to Y
>>
>> So in this example since we configured the component with exclusive
>> reply to, and a reply to pattern as well. Then what happens is that
>> Camel will use a reply to queue named: bar.reply
>>
>>
>> Of course you can still configure all the options on the endpoint as
>> well if you like. And you can override the component settings so in
>> case you want a special reply to name you can do:
>>
>> from X
>> to("jms:queue:bar?replyTo=special")
>> to Y
>>
>> Any thoughts?
>
> This wouldn't be useful for me since I don't have a one-to-one
> correspondence between request queues and reply queues, but I can see how it
> would be useful for people who do.  Some people may want the ability to
> override the settings on a particular url, so that a temporary queue is used
> for that route.
>

Ah good idea. You could possible set that as ?replyTo= but that would be "ugly".
So we could have that enum and then people could say

replyToType=temporary
replyToType=shared
replyToType=exclusive

Now anyone got a good name for such an option?

We would need a bit validation to guard against invalid configurations such as
?replyTo=foo&replyToType=temporary

As you cannot use a fixed name for a temporary queue name.


> Jim
>
>>
>>
>>> Regards,
>>> Jim
>>>
>>> On 7/9/2011 5:52 AM, Claus Ibsen wrote:
>>>>
>>>> Hi Jim
>>>>
>>>> I have created a ticket and posted some comments about the issue
>>>> https://issues.apache.org/jira/browse/CAMEL-4202
>>>>
>>>> Are you using a fixed reply to queue that is *exclusive* to the Camel
>>>> route?
>>>> Or is the queue used for other purposes as well?
>>>>
>>>> Is there a special reason why you want to use a fixed reply to queue?
>>>>
>>>>
>>>>
>>>> On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham<jn...@referentia.com>
>>>>  wrote:
>>>>>
>>>>> Hi Claus,
>>>>>
>>>>> I enabled trace logging.  I'm attaching the logs (for both client and
>>>>> server; both with and without custom replyTo) as a zip file -- not sure
>>>>> if
>>>>> the mailing list will filter it, we'll see.
>>>>>
>>>>> I see that there are 5 messages in the client log which only appear
>>>>> when
>>>>> a
>>>>> custom replyTo is specified:  "Running purge task to see if any entries
>>>>> has
>>>>> been timed out", "There are 1 in the timeout map", "did not receive a
>>>>> message", etc.  Here's an excerpt from each client log, for one
>>>>> exchange:
>>>>>
>>>>>  From log for client without replyTo:
>>>>>
>>>>> 2011-07-08 10:55:32,354 [main] TRACE
>>>>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>>>>> 2011-07-08 10:55:32,361 [main] DEBUG
>>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>>>> Executing
>>>>> callback on JMS Session: ActiveMQSession
>>>>> {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
>>>>> 2011-07-08 10:55:32,361 [main] TRACE
>>>>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>>>>> 2011-07-08 10:55:32,362 [main] DEBUG
>>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>>>> Sending
>>>>> JMS message to: queue://dest with message: ActiveMQTextMessage
>>>>> {commandId
>>>>> =
>>>>> 0, responseRequired = false, messageId = null, originalDestination =
>>>>> null,
>>>>> originalTransactionId = null, producerId = null, destination = null,
>>>>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>>>>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>>>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>>>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
>>>>> false, type = null, priority = 0, groupID = null, groupSequence = 0,
>>>>> targetConsumerId = null, compressed = false, userID = null, content =
>>>>> null,
>>>>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
>>>>> 0,
>>>>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>>>>> false, droppable = false, text = abc}
>>>>> 2011-07-08 10:55:32,363 [main] DEBUG
>>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
>>>>> JMS
>>>>> message to: queue://dest with message: ActiveMQTextMessage {commandId =
>>>>> 0,
>>>>> responseRequired = false, messageId =
>>>>> ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination =
>>>>> null, originalTransactionId = null, producerId = null, destination =
>>>>> queue://dest, transactionId = null, expiration = 1310158552362,
>>>>> timestamp
>>>>> =
>>>>> 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>>>>> correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>>>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
>>>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>>>> targetConsumerId = null, compressed = false, userID = null, content =
>>>>> null,
>>>>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
>>>>> 0,
>>>>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>>>>> false, droppable = false, text = abc}
>>>>> 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
>>>>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
>>>>> Received
>>>>> reply message with correlationID:
>>>>> ID-rsi-eng-newsham-61472-1310158530715-0-4
>>>>> ->    ActiveMQTextMessage {commandId = 9, responseRequired = true,
>>>>> messageId =
>>>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination =
>>>>> null, originalTransactionId = null, producerId =
>>>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>>>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1,
>>>>> transactionId
>>>>> =
>>>>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
>>>>> brokerInTime =
>>>>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>>>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent
>>>>> =
>>>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>>>> targetConsumerId = null, compressed = false, userID = null, content =
>>>>> null,
>>>>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
>>>>> 0,
>>>>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
>>>>> true,
>>>>> droppable = false, text = reply}
>>>>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
>>>>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
>>>>> TextMessage
>>>>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired
>>>>> =
>>>>> true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
>>>>> originalDestination = null, originalTransactionId = null, producerId =
>>>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>>>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1,
>>>>> transactionId
>>>>> =
>>>>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
>>>>> brokerInTime =
>>>>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>>>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent
>>>>> =
>>>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>>>> targetConsumerId = null, compressed = false, userID = null, content =
>>>>> null,
>>>>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
>>>>> 0,
>>>>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
>>>>> true,
>>>>> droppable = false, text = reply}
>>>>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
>>>>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply
>>>>> received. Setting reply as OUT message: reply
>>>>> received reply in: 0.015 s
>>>>>
>>>>>  From log for client with replyTo:
>>>>>
>>>>> 2011-07-08 10:52:10,075 [main] TRACE
>>>>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>>>>> 2011-07-08 10:52:10,081 [main] DEBUG
>>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>>>> Executing
>>>>> callback on JMS Session: ActiveMQSession
>>>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
>>>>> 2011-07-08 10:52:10,082 [main] TRACE
>>>>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>>>>> 2011-07-08 10:52:10,082 [main] DEBUG
>>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>>>> Sending
>>>>> JMS message to: queue://dest with message: ActiveMQTextMessage
>>>>> {commandId
>>>>> =
>>>>> 0, responseRequired = false, messageId = null, originalDestination =
>>>>> null,
>>>>> originalTransactionId = null, producerId = null, destination = null,
>>>>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>>>>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>>>>> ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
>>>>> queue://replyQueue,
>>>>> persistent = false, type = null, priority = 0, groupID = null,
>>>>> groupSequence
>>>>> = 0, targetConsumerId = null, compressed = false, userID = null,
>>>>> content
>>>>> =
>>>>> null, marshalledProperties = null, dataStructure = null,
>>>>> redeliveryCounter =
>>>>> 0, size = 0, properties = null, readOnlyProperties = false,
>>>>> readOnlyBody
>>>>> =
>>>>> false, droppable = false, text = abc}
>>>>> 2011-07-08 10:52:10,083 [main] DEBUG
>>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
>>>>> JMS
>>>>> message to: queue://dest with message: ActiveMQTextMessage {commandId =
>>>>> 0,
>>>>> responseRequired = false, messageId =
>>>>> ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination =
>>>>> null, originalTransactionId = null, producerId = null, destination =
>>>>> queue://dest, transactionId = null, expiration = 1310158350082,
>>>>> timestamp
>>>>> =
>>>>> 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>>>>> correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
>>>>> queue://replyQueue, persistent = true, type = null, priority = 4,
>>>>> groupID
>>>>> =
>>>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>>>> userID
>>>>> = null, content = null, marshalledProperties = null, dataStructure =
>>>>> null,
>>>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties
>>>>> =
>>>>> false, readOnlyBody = false, droppable = false, text = abc}
>>>>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>>>>> JmsReplyManagerTimeoutChecker[dest] TRACE
>>>>> org.apache.camel.component.jms.reply.CorrelationMap - Running purge
>>>>> task
>>>>> to
>>>>> see if any entries has been timed out
>>>>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>>>>> JmsReplyManagerTimeoutChecker[dest] TRACE
>>>>> org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in
>>>>> the
>>>>> timeout map
>>>>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG
>>>>>
>>>>>
>>>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>>>>> - Consumer [ActiveMQMessageConsumer {
>>>>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }]
>>>>> of
>>>>> session [ActiveMQSession
>>>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not
>>>>> receive a message
>>>>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
>>>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
>>>>> Using
>>>>>
>>>>>
>>>>> MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
>>>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>>>>
>>>>>
>>>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>>>>> - Received message of type [class
>>>>> org.apache.activemq.command.ActiveMQTextMessage] from consumer
>>>>> [ActiveMQMessageConsumer {
>>>>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }]
>>>>> of
>>>>> session [ActiveMQSession
>>>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
>>>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
>>>>> Received
>>>>> reply message with correlationID:
>>>>> ID-rsi-eng-newsham-61454-1310158327407-0-4
>>>>> ->    ActiveMQTextMessage {commandId = 9, responseRequired = true,
>>>>> messageId =
>>>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination =
>>>>> null, originalTransactionId = null, producerId =
>>>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>>>>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>>>>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime
>>>>> =
>>>>> 1310158331077, correlationId =
>>>>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
>>>>> replyTo = null, persistent = true, type = null, priority = 4, groupID =
>>>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>>>> userID
>>>>> = null, content = null, marshalledProperties = null, dataStructure =
>>>>> null,
>>>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties
>>>>> =
>>>>> true, readOnlyBody = true, droppable = false, text = reply}
>>>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
>>>>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
>>>>> TextMessage
>>>>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired
>>>>> =
>>>>> true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
>>>>> originalDestination = null, originalTransactionId = null, producerId =
>>>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>>>>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>>>>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime
>>>>> =
>>>>> 1310158331077, correlationId =
>>>>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
>>>>> replyTo = null, persistent = true, type = null, priority = 4, groupID =
>>>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>>>> userID
>>>>> = null, content = null, marshalledProperties = null, dataStructure =
>>>>> null,
>>>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties
>>>>> =
>>>>> true, readOnlyBody = true, droppable = false, text = reply}
>>>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
>>>>> Reply
>>>>> received. Setting reply as OUT message: reply
>>>>> received reply in: 1.004 s
>>>>>
>>>>> Thanks,
>>>>> Jim
>>>>>
>>>>>
>>>>>
>>>>> On 7/7/2011 10:59 PM, Claus Ibsen wrote:
>>>>>>
>>>>>> Can you enable TRACE logging at org.apache.camel.component.jms and run
>>>>>> it for both examples.
>>>>>> To see what happens.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jn...@referentia.com>
>>>>>>  wrote:
>>>>>>>
>>>>>>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason,
>>>>>>> when
>>>>>>> I
>>>>>>> specify a custom replyTo destination on the endpoint url, the time it
>>>>>>> takes
>>>>>>> for the producer to receive a reply increases drastically.  The
>>>>>>> curious
>>>>>>> thing is that the time to receive a reply is almost exactly 1 second.
>>>>>>>  When
>>>>>>> I remove the replyTo from the url, everything's fast again.
>>>>>>>
>>>>>>> I created a very simple, stand-alone test to demonstrate what I'm
>>>>>>> seeing.
>>>>>>>  There is a server class [4] which runs an embedded instance of
>>>>>>> ActiveMQ
>>>>>>> and
>>>>>>> simply replies to messages as they arrive; and a client [3] class
>>>>>>> which
>>>>>>> simply sends messages to the server, and prints the elapsed time.
>>>>>>>  The
>>>>>>> USE_REPLY_TO symbolic constant in the client determines whether a
>>>>>>> replyTo
>>>>>>> value is added to the url or not.
>>>>>>>
>>>>>>> The client output when USE_REPLY_TO is false is shown as [1].  The
>>>>>>> client
>>>>>>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>>>>>>> trivial.  Am I doing something wrong, or is this a Camel and/or
>>>>>>> ActiveMQ
>>>>>>> issue?
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Jim
>>>>>>>
>>>>>>>
>>>>>>> [1] USE_REPLY_TO = false
>>>>>>>
>>>>>>> received reply in: 0.476 s
>>>>>>> received reply in: 0.006 s
>>>>>>> received reply in: 0.006 s
>>>>>>> received reply in: 0.006 s
>>>>>>> received reply in: 0.006 s
>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>> [2] USE_REPLY_TO = true
>>>>>>>
>>>>>>> received reply in: 1.524 s
>>>>>>> received reply in: 1.002 s
>>>>>>> received reply in: 1.003 s
>>>>>>> received reply in: 1.003 s
>>>>>>> received reply in: 1.002 s
>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>> [3] TestReplyToClient.java
>>>>>>>
>>>>>>> package test;
>>>>>>>
>>>>>>> import org.apache.activemq.ActiveMQConnectionFactory;
>>>>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>>>>>> import org.apache.camel.CamelContext;
>>>>>>> import org.apache.camel.ProducerTemplate;
>>>>>>> import org.apache.camel.impl.DefaultCamelContext;
>>>>>>>
>>>>>>> public class TestReplyToClient {
>>>>>>>
>>>>>>>  private static final boolean USE_REPLY_TO = false;
>>>>>>>
>>>>>>>  public static void main(String... args) throws Exception {
>>>>>>>    // create camel context; configure activemq component for
>>>>>>> tcp://localhost:7001
>>>>>>>    CamelContext context = new DefaultCamelContext();
>>>>>>>    ActiveMQComponent activemqComponent =
>>>>>>> ActiveMQComponent.activeMQComponent();
>>>>>>>    activemqComponent.setConnectionFactory(new
>>>>>>> ActiveMQConnectionFactory(
>>>>>>>      null, null, "tcp://localhost:7001"));
>>>>>>>    context.addComponent("activemq", activemqComponent);
>>>>>>>    context.start();
>>>>>>>
>>>>>>>    // define url to send requests to
>>>>>>>    String sendUrl = "activemq:queue:dest";
>>>>>>>    if (USE_REPLY_TO) {
>>>>>>>      sendUrl += "?replyTo=replyQueue";
>>>>>>>    }
>>>>>>>    System.err.println("sending to url: " + sendUrl);
>>>>>>>
>>>>>>>    // repeatedly send requests; measure elapsed time
>>>>>>>    ProducerTemplate template = context.createProducerTemplate();
>>>>>>>    while (true) {
>>>>>>>      long startNanos = System.nanoTime();
>>>>>>>      template.requestBody(sendUrl, "abc");
>>>>>>>      long elapsedNanos = System.nanoTime() - startNanos;
>>>>>>>      System.err.println(String.format("received reply in: %.3f s",
>>>>>>> elapsedNanos / 1000000000.0));
>>>>>>>    }
>>>>>>>  }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> [4] TestReplyToServer.java
>>>>>>>
>>>>>>> package test;
>>>>>>>
>>>>>>> import org.apache.activemq.broker.BrokerService;
>>>>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>>>>>> import org.apache.camel.CamelContext;
>>>>>>> import org.apache.camel.Exchange;
>>>>>>> import org.apache.camel.Processor;
>>>>>>> import org.apache.camel.builder.RouteBuilder;
>>>>>>> import org.apache.camel.impl.DefaultCamelContext;
>>>>>>>
>>>>>>> public class TestReplyToServer {
>>>>>>>
>>>>>>>  private static final String BROKER_NAME = "thebroker";
>>>>>>>
>>>>>>>  public static void main(String... args) throws Exception {
>>>>>>>    startBroker();
>>>>>>>    startCamel();
>>>>>>>    Thread.sleep(Long.MAX_VALUE);
>>>>>>>  }
>>>>>>>
>>>>>>>  private static void startBroker() throws Exception {
>>>>>>>    BrokerService brokerService = new BrokerService();
>>>>>>>    brokerService.setBrokerName(BROKER_NAME);
>>>>>>>    brokerService.setSchedulerSupport(false);
>>>>>>>    brokerService.setPersistent(false);
>>>>>>>    brokerService.addConnector("tcp://0.0.0.0:7001");
>>>>>>>    brokerService.start();
>>>>>>>    brokerService.waitUntilStarted();
>>>>>>>  }
>>>>>>>
>>>>>>>
>>>>>>>  private static void startCamel() throws Exception {
>>>>>>>    CamelContext context = new DefaultCamelContext();
>>>>>>>
>>>>>>>    ActiveMQComponent activemqComponent =
>>>>>>> ActiveMQComponent.activeMQComponent();
>>>>>>>
>>>>>>>  activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>>>>>>> BROKER_NAME));
>>>>>>>    context.addComponent("activemq", activemqComponent);
>>>>>>>
>>>>>>>    final String receiveUrl = "activemq:queue:dest";
>>>>>>>    context.addRoutes(new RouteBuilder() {
>>>>>>>      @Override
>>>>>>>      public void configure() throws Exception {
>>>>>>>        from(receiveUrl).process(new Processor() {
>>>>>>>          @Override
>>>>>>>          public void process(Exchange exchange) throws Exception {
>>>>>>>            System.err.println("received request");
>>>>>>>            exchange.getOut().setBody("reply");
>>>>>>>          }
>>>>>>>        });
>>>>>>>      }
>>>>>>>    });
>>>>>>>
>>>>>>>    context.start();
>>>>>>>    System.err.println("listening on url: " + receiveUrl);
>>>>>>>  }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>
>>>
>>
>>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: slow reply for jms component when url contains replyTo

Posted by Jim Newsham <jn...@referentia.com>.
On 7/11/2011 7:25 PM, Claus Ibsen wrote:
> On Mon, Jul 11, 2011 at 10:12 PM, Jim Newsham<jn...@referentia.com>  wrote:
>> Hi Claus,
>>
>> Thanks for opening the jira issue, and for your comments.  To answer your
>> questions:
>>
>> 1.  We use fixed reply-to queues which are exclusive to Camel.
>> 2.  We need a fixed reply-to queue to avoid losing reply messages in case of
>> disconnection (which would happen with temporary queues because they are
>> scoped to the lifetime of the connection).
>>
> I think we should add an option to the JMS component so you can
> configure that the replyTo is to be consider as exclusive.
>
> ?replyTo=bar&exclusiveReplyTo=true
>
> Something like this. Anyone got a better idea for a name of the option?
That would be fine for me.  :)  I don't have any better ideas regarding 
the parameter name.


> Likewise I have pondered about if we should add an option to have
> convention over configuration? So for example you can configure on the
> jms component, a pattern of the reply to names.
>
> jmsComponent.setExclusiveReplyTo(true);
> jmsComponent.setReplyToPattern("${name}.reply");
>
> So in this example Camel will automatic name the reply to queues, as
> the request queue name + ".reply". So in the example above we can do
>
> from X
> to("jms:queue:bar")
> to Y
>
> So in this example since we configured the component with exclusive
> reply to, and a reply to pattern as well. Then what happens is that
> Camel will use a reply to queue named: bar.reply
>
>
> Of course you can still configure all the options on the endpoint as
> well if you like. And you can override the component settings so in
> case you want a special reply to name you can do:
>
> from X
> to("jms:queue:bar?replyTo=special")
> to Y
>
> Any thoughts?

This wouldn't be useful for me since I don't have a one-to-one 
correspondence between request queues and reply queues, but I can see 
how it would be useful for people who do.  Some people may want the 
ability to override the settings on a particular url, so that a 
temporary queue is used for that route.

Jim

>
>
>> Regards,
>> Jim
>>
>> On 7/9/2011 5:52 AM, Claus Ibsen wrote:
>>> Hi Jim
>>>
>>> I have created a ticket and posted some comments about the issue
>>> https://issues.apache.org/jira/browse/CAMEL-4202
>>>
>>> Are you using a fixed reply to queue that is *exclusive* to the Camel
>>> route?
>>> Or is the queue used for other purposes as well?
>>>
>>> Is there a special reason why you want to use a fixed reply to queue?
>>>
>>>
>>>
>>> On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham<jn...@referentia.com>
>>>   wrote:
>>>> Hi Claus,
>>>>
>>>> I enabled trace logging.  I'm attaching the logs (for both client and
>>>> server; both with and without custom replyTo) as a zip file -- not sure
>>>> if
>>>> the mailing list will filter it, we'll see.
>>>>
>>>> I see that there are 5 messages in the client log which only appear when
>>>> a
>>>> custom replyTo is specified:  "Running purge task to see if any entries
>>>> has
>>>> been timed out", "There are 1 in the timeout map", "did not receive a
>>>> message", etc.  Here's an excerpt from each client log, for one exchange:
>>>>
>>>>   From log for client without replyTo:
>>>>
>>>> 2011-07-08 10:55:32,354 [main] TRACE
>>>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>>>> 2011-07-08 10:55:32,361 [main] DEBUG
>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>>> Executing
>>>> callback on JMS Session: ActiveMQSession
>>>> {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
>>>> 2011-07-08 10:55:32,361 [main] TRACE
>>>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>>>> 2011-07-08 10:55:32,362 [main] DEBUG
>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>>> Sending
>>>> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
>>>> =
>>>> 0, responseRequired = false, messageId = null, originalDestination =
>>>> null,
>>>> originalTransactionId = null, producerId = null, destination = null,
>>>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>>>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
>>>> false, type = null, priority = 0, groupID = null, groupSequence = 0,
>>>> targetConsumerId = null, compressed = false, userID = null, content =
>>>> null,
>>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>>>> false, droppable = false, text = abc}
>>>> 2011-07-08 10:55:32,363 [main] DEBUG
>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
>>>> JMS
>>>> message to: queue://dest with message: ActiveMQTextMessage {commandId =
>>>> 0,
>>>> responseRequired = false, messageId =
>>>> ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination =
>>>> null, originalTransactionId = null, producerId = null, destination =
>>>> queue://dest, transactionId = null, expiration = 1310158552362, timestamp
>>>> =
>>>> 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>>>> correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
>>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>>> targetConsumerId = null, compressed = false, userID = null, content =
>>>> null,
>>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>>>> false, droppable = false, text = abc}
>>>> 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
>>>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
>>>> Received
>>>> reply message with correlationID:
>>>> ID-rsi-eng-newsham-61472-1310158530715-0-4
>>>> ->    ActiveMQTextMessage {commandId = 9, responseRequired = true,
>>>> messageId =
>>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination =
>>>> null, originalTransactionId = null, producerId =
>>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
>>>> =
>>>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
>>>> brokerInTime =
>>>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
>>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>>> targetConsumerId = null, compressed = false, userID = null, content =
>>>> null,
>>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
>>>> true,
>>>> droppable = false, text = reply}
>>>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
>>>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
>>>> TextMessage
>>>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
>>>> true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
>>>> originalDestination = null, originalTransactionId = null, producerId =
>>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
>>>> =
>>>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
>>>> brokerInTime =
>>>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
>>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>>> targetConsumerId = null, compressed = false, userID = null, content =
>>>> null,
>>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
>>>> true,
>>>> droppable = false, text = reply}
>>>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
>>>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply
>>>> received. Setting reply as OUT message: reply
>>>> received reply in: 0.015 s
>>>>
>>>>   From log for client with replyTo:
>>>>
>>>> 2011-07-08 10:52:10,075 [main] TRACE
>>>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>>>> 2011-07-08 10:52:10,081 [main] DEBUG
>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>>> Executing
>>>> callback on JMS Session: ActiveMQSession
>>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
>>>> 2011-07-08 10:52:10,082 [main] TRACE
>>>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>>>> 2011-07-08 10:52:10,082 [main] DEBUG
>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>>> Sending
>>>> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
>>>> =
>>>> 0, responseRequired = false, messageId = null, originalDestination =
>>>> null,
>>>> originalTransactionId = null, producerId = null, destination = null,
>>>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>>>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>>>> ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = queue://replyQueue,
>>>> persistent = false, type = null, priority = 0, groupID = null,
>>>> groupSequence
>>>> = 0, targetConsumerId = null, compressed = false, userID = null, content
>>>> =
>>>> null, marshalledProperties = null, dataStructure = null,
>>>> redeliveryCounter =
>>>> 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody
>>>> =
>>>> false, droppable = false, text = abc}
>>>> 2011-07-08 10:52:10,083 [main] DEBUG
>>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
>>>> JMS
>>>> message to: queue://dest with message: ActiveMQTextMessage {commandId =
>>>> 0,
>>>> responseRequired = false, messageId =
>>>> ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination =
>>>> null, originalTransactionId = null, producerId = null, destination =
>>>> queue://dest, transactionId = null, expiration = 1310158350082, timestamp
>>>> =
>>>> 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>>>> correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
>>>> queue://replyQueue, persistent = true, type = null, priority = 4, groupID
>>>> =
>>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>>> userID
>>>> = null, content = null, marshalledProperties = null, dataStructure =
>>>> null,
>>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>>>> false, readOnlyBody = false, droppable = false, text = abc}
>>>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>>>> JmsReplyManagerTimeoutChecker[dest] TRACE
>>>> org.apache.camel.component.jms.reply.CorrelationMap - Running purge task
>>>> to
>>>> see if any entries has been timed out
>>>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>>>> JmsReplyManagerTimeoutChecker[dest] TRACE
>>>> org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in the
>>>> timeout map
>>>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG
>>>>
>>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>>>> - Consumer [ActiveMQMessageConsumer {
>>>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }] of
>>>> session [ActiveMQSession
>>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not
>>>> receive a message
>>>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
>>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Using
>>>>
>>>> MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
>>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>>>
>>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>>>> - Received message of type [class
>>>> org.apache.activemq.command.ActiveMQTextMessage] from consumer
>>>> [ActiveMQMessageConsumer {
>>>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }] of
>>>> session [ActiveMQSession
>>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
>>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
>>>> Received
>>>> reply message with correlationID:
>>>> ID-rsi-eng-newsham-61454-1310158327407-0-4
>>>> ->    ActiveMQTextMessage {commandId = 9, responseRequired = true,
>>>> messageId =
>>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination =
>>>> null, originalTransactionId = null, producerId =
>>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>>>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>>>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
>>>> 1310158331077, correlationId =
>>>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
>>>> replyTo = null, persistent = true, type = null, priority = 4, groupID =
>>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>>> userID
>>>> = null, content = null, marshalledProperties = null, dataStructure =
>>>> null,
>>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>>>> true, readOnlyBody = true, droppable = false, text = reply}
>>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
>>>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
>>>> TextMessage
>>>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
>>>> true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
>>>> originalDestination = null, originalTransactionId = null, producerId =
>>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>>>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>>>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
>>>> 1310158331077, correlationId =
>>>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
>>>> replyTo = null, persistent = true, type = null, priority = 4, groupID =
>>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>>> userID
>>>> = null, content = null, marshalledProperties = null, dataStructure =
>>>> null,
>>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>>>> true, readOnlyBody = true, droppable = false, text = reply}
>>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Reply
>>>> received. Setting reply as OUT message: reply
>>>> received reply in: 1.004 s
>>>>
>>>> Thanks,
>>>> Jim
>>>>
>>>>
>>>>
>>>> On 7/7/2011 10:59 PM, Claus Ibsen wrote:
>>>>> Can you enable TRACE logging at org.apache.camel.component.jms and run
>>>>> it for both examples.
>>>>> To see what happens.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jn...@referentia.com>
>>>>>   wrote:
>>>>>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when
>>>>>> I
>>>>>> specify a custom replyTo destination on the endpoint url, the time it
>>>>>> takes
>>>>>> for the producer to receive a reply increases drastically.  The curious
>>>>>> thing is that the time to receive a reply is almost exactly 1 second.
>>>>>>   When
>>>>>> I remove the replyTo from the url, everything's fast again.
>>>>>>
>>>>>> I created a very simple, stand-alone test to demonstrate what I'm
>>>>>> seeing.
>>>>>>   There is a server class [4] which runs an embedded instance of
>>>>>> ActiveMQ
>>>>>> and
>>>>>> simply replies to messages as they arrive; and a client [3] class which
>>>>>> simply sends messages to the server, and prints the elapsed time.  The
>>>>>> USE_REPLY_TO symbolic constant in the client determines whether a
>>>>>> replyTo
>>>>>> value is added to the url or not.
>>>>>>
>>>>>> The client output when USE_REPLY_TO is false is shown as [1].  The
>>>>>> client
>>>>>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>>>>>> trivial.  Am I doing something wrong, or is this a Camel and/or
>>>>>> ActiveMQ
>>>>>> issue?
>>>>>>
>>>>>> Thanks!
>>>>>> Jim
>>>>>>
>>>>>>
>>>>>> [1] USE_REPLY_TO = false
>>>>>>
>>>>>> received reply in: 0.476 s
>>>>>> received reply in: 0.006 s
>>>>>> received reply in: 0.006 s
>>>>>> received reply in: 0.006 s
>>>>>> received reply in: 0.006 s
>>>>>> ...
>>>>>>
>>>>>>
>>>>>> [2] USE_REPLY_TO = true
>>>>>>
>>>>>> received reply in: 1.524 s
>>>>>> received reply in: 1.002 s
>>>>>> received reply in: 1.003 s
>>>>>> received reply in: 1.003 s
>>>>>> received reply in: 1.002 s
>>>>>> ...
>>>>>>
>>>>>>
>>>>>> [3] TestReplyToClient.java
>>>>>>
>>>>>> package test;
>>>>>>
>>>>>> import org.apache.activemq.ActiveMQConnectionFactory;
>>>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>>>>> import org.apache.camel.CamelContext;
>>>>>> import org.apache.camel.ProducerTemplate;
>>>>>> import org.apache.camel.impl.DefaultCamelContext;
>>>>>>
>>>>>> public class TestReplyToClient {
>>>>>>
>>>>>>   private static final boolean USE_REPLY_TO = false;
>>>>>>
>>>>>>   public static void main(String... args) throws Exception {
>>>>>>     // create camel context; configure activemq component for
>>>>>> tcp://localhost:7001
>>>>>>     CamelContext context = new DefaultCamelContext();
>>>>>>     ActiveMQComponent activemqComponent =
>>>>>> ActiveMQComponent.activeMQComponent();
>>>>>>     activemqComponent.setConnectionFactory(new
>>>>>> ActiveMQConnectionFactory(
>>>>>>       null, null, "tcp://localhost:7001"));
>>>>>>     context.addComponent("activemq", activemqComponent);
>>>>>>     context.start();
>>>>>>
>>>>>>     // define url to send requests to
>>>>>>     String sendUrl = "activemq:queue:dest";
>>>>>>     if (USE_REPLY_TO) {
>>>>>>       sendUrl += "?replyTo=replyQueue";
>>>>>>     }
>>>>>>     System.err.println("sending to url: " + sendUrl);
>>>>>>
>>>>>>     // repeatedly send requests; measure elapsed time
>>>>>>     ProducerTemplate template = context.createProducerTemplate();
>>>>>>     while (true) {
>>>>>>       long startNanos = System.nanoTime();
>>>>>>       template.requestBody(sendUrl, "abc");
>>>>>>       long elapsedNanos = System.nanoTime() - startNanos;
>>>>>>       System.err.println(String.format("received reply in: %.3f s",
>>>>>> elapsedNanos / 1000000000.0));
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> [4] TestReplyToServer.java
>>>>>>
>>>>>> package test;
>>>>>>
>>>>>> import org.apache.activemq.broker.BrokerService;
>>>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>>>>> import org.apache.camel.CamelContext;
>>>>>> import org.apache.camel.Exchange;
>>>>>> import org.apache.camel.Processor;
>>>>>> import org.apache.camel.builder.RouteBuilder;
>>>>>> import org.apache.camel.impl.DefaultCamelContext;
>>>>>>
>>>>>> public class TestReplyToServer {
>>>>>>
>>>>>>   private static final String BROKER_NAME = "thebroker";
>>>>>>
>>>>>>   public static void main(String... args) throws Exception {
>>>>>>     startBroker();
>>>>>>     startCamel();
>>>>>>     Thread.sleep(Long.MAX_VALUE);
>>>>>>   }
>>>>>>
>>>>>>   private static void startBroker() throws Exception {
>>>>>>     BrokerService brokerService = new BrokerService();
>>>>>>     brokerService.setBrokerName(BROKER_NAME);
>>>>>>     brokerService.setSchedulerSupport(false);
>>>>>>     brokerService.setPersistent(false);
>>>>>>     brokerService.addConnector("tcp://0.0.0.0:7001");
>>>>>>     brokerService.start();
>>>>>>     brokerService.waitUntilStarted();
>>>>>>   }
>>>>>>
>>>>>>
>>>>>>   private static void startCamel() throws Exception {
>>>>>>     CamelContext context = new DefaultCamelContext();
>>>>>>
>>>>>>     ActiveMQComponent activemqComponent =
>>>>>> ActiveMQComponent.activeMQComponent();
>>>>>>     activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>>>>>> BROKER_NAME));
>>>>>>     context.addComponent("activemq", activemqComponent);
>>>>>>
>>>>>>     final String receiveUrl = "activemq:queue:dest";
>>>>>>     context.addRoutes(new RouteBuilder() {
>>>>>>       @Override
>>>>>>       public void configure() throws Exception {
>>>>>>         from(receiveUrl).process(new Processor() {
>>>>>>           @Override
>>>>>>           public void process(Exchange exchange) throws Exception {
>>>>>>             System.err.println("received request");
>>>>>>             exchange.getOut().setBody("reply");
>>>>>>           }
>>>>>>         });
>>>>>>       }
>>>>>>     });
>>>>>>
>>>>>>     context.start();
>>>>>>     System.err.println("listening on url: " + receiveUrl);
>>>>>>   }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>
>>
>
>


Re: slow reply for jms component when url contains replyTo

Posted by Claus Ibsen <cl...@gmail.com>.
On Sun, Jul 17, 2011 at 7:38 PM, David Karlsen <da...@gmail.com> wrote:
> Maybe better to provide a replyto resolver strategyinterface with a few
> implementations for vinning cases?

That could be a good idea, lets see how this goes.


> Den 12. juli 2011 07:26 skrev "Claus Ibsen" <cl...@gmail.com>
> følgende:
>>
>> On Mon, Jul 11, 2011 at 10:12 PM, Jim Newsham <jn...@referentia.com>
> wrote:
>> >
>> > Hi Claus,
>> >
>> > Thanks for opening the jira issue, and for your comments.  To answer
> your
>> > questions:
>> >
>> > 1.  We use fixed reply-to queues which are exclusive to Camel.
>> > 2.  We need a fixed reply-to queue to avoid losing reply messages in
> case of
>> > disconnection (which would happen with temporary queues because they are
>> > scoped to the lifetime of the connection).
>> >
>>
>> I think we should add an option to the JMS component so you can
>> configure that the replyTo is to be consider as exclusive.
>>
>> ?replyTo=bar&exclusiveReplyTo=true
>>
>> Something like this. Anyone got a better idea for a name of the option?
>>
>> Likewise I have pondered about if we should add an option to have
>> convention over configuration? So for example you can configure on the
>> jms component, a pattern of the reply to names.
>>
>> jmsComponent.setExclusiveReplyTo(true);
>> jmsComponent.setReplyToPattern("${name}.reply");
>>
>> So in this example Camel will automatic name the reply to queues, as
>> the request queue name + ".reply". So in the example above we can do
>>
>> from X
>> to("jms:queue:bar")
>> to Y
>>
>> So in this example since we configured the component with exclusive
>> reply to, and a reply to pattern as well. Then what happens is that
>> Camel will use a reply to queue named: bar.reply
>>
>>
>> Of course you can still configure all the options on the endpoint as
>> well if you like. And you can override the component settings so in
>> case you want a special reply to name you can do:
>>
>> from X
>> to("jms:queue:bar?replyTo=special")
>> to Y
>>
>> Any thoughts?
>>
>>
>> > Regards,
>> > Jim
>> >
>> > On 7/9/2011 5:52 AM, Claus Ibsen wrote:
>> >>
>> >> Hi Jim
>> >>
>> >> I have created a ticket and posted some comments about the issue
>> >> https://issues.apache.org/jira/browse/CAMEL-4202
>> >>
>> >> Are you using a fixed reply to queue that is *exclusive* to the Camel
>> >> route?
>> >> Or is the queue used for other purposes as well?
>> >>
>> >> Is there a special reason why you want to use a fixed reply to queue?
>> >>
>> >>
>> >>
>> >> On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham<jn...@referentia.com>
>> >>  wrote:
>> >>>
>> >>> Hi Claus,
>> >>>
>> >>> I enabled trace logging.  I'm attaching the logs (for both client and
>> >>> server; both with and without custom replyTo) as a zip file -- not
> sure
>> >>> if
>> >>> the mailing list will filter it, we'll see.
>> >>>
>> >>> I see that there are 5 messages in the client log which only appear
> when
>> >>> a
>> >>> custom replyTo is specified:  "Running purge task to see if any
> entries
>> >>> has
>> >>> been timed out", "There are 1 in the timeout map", "did not receive a
>> >>> message", etc.  Here's an excerpt from each client log, for one
> exchange:
>> >>>
>> >>>  From log for client without replyTo:
>> >>>
>> >>> 2011-07-08 10:55:32,354 [main] TRACE
>> >>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>> >>> 2011-07-08 10:55:32,361 [main] DEBUG
>> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>> >>> Executing
>> >>> callback on JMS Session: ActiveMQSession
>> >>> {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
>> >>> 2011-07-08 10:55:32,361 [main] TRACE
>> >>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>> >>> 2011-07-08 10:55:32,362 [main] DEBUG
>> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>> >>> Sending
>> >>> JMS message to: queue://dest with message: ActiveMQTextMessage
> {commandId
>> >>> =
>> >>> 0, responseRequired = false, messageId = null, originalDestination =
>> >>> null,
>> >>> originalTransactionId = null, producerId = null, destination = null,
>> >>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>> >>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>> >>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>> >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent
> =
>> >>> false, type = null, priority = 0, groupID = null, groupSequence = 0,
>> >>> targetConsumerId = null, compressed = false, userID = null, content =
>> >>> null,
>> >>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
> 0,
>> >>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody
> =
>> >>> false, droppable = false, text = abc}
>> >>> 2011-07-08 10:55:32,363 [main] DEBUG
>> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> Sent
>> >>> JMS
>> >>> message to: queue://dest with message: ActiveMQTextMessage {commandId
> =
>> >>> 0,
>> >>> responseRequired = false, messageId =
>> >>> ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination
> =
>> >>> null, originalTransactionId = null, producerId = null, destination =
>> >>> queue://dest, transactionId = null, expiration = 1310158552362,
> timestamp
>> >>> =
>> >>> 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>> >>> correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>> >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent
> =
>> >>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>> >>> targetConsumerId = null, compressed = false, userID = null, content =
>> >>> null,
>> >>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
> 0,
>> >>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody
> =
>> >>> false, droppable = false, text = abc}
>> >>> 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
>> >>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
>> >>> Received
>> >>> reply message with correlationID:
>> >>> ID-rsi-eng-newsham-61472-1310158530715-0-4
>> >>> ->  ActiveMQTextMessage {commandId = 9, responseRequired = true,
>> >>> messageId =
>> >>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination
> =
>> >>> null, originalTransactionId = null, producerId =
>> >>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>> >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1,
> transactionId
>> >>> =
>> >>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
>> >>> brokerInTime =
>> >>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>> >>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent
> =
>> >>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>> >>> targetConsumerId = null, compressed = false, userID = null, content =
>> >>> null,
>> >>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
> 0,
>> >>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
>> >>> true,
>> >>> droppable = false, text = reply}
>> >>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
>> >>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
>> >>> TextMessage
>> >>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired
> =
>> >>> true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
>> >>> originalDestination = null, originalTransactionId = null, producerId =
>> >>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>> >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1,
> transactionId
>> >>> =
>> >>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
>> >>> brokerInTime =
>> >>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>> >>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent
> =
>> >>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>> >>> targetConsumerId = null, compressed = false, userID = null, content =
>> >>> null,
>> >>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
> 0,
>> >>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
>> >>> true,
>> >>> droppable = false, text = reply}
>> >>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
>> >>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
> Reply
>> >>> received. Setting reply as OUT message: reply
>> >>> received reply in: 0.015 s
>> >>>
>> >>>  From log for client with replyTo:
>> >>>
>> >>> 2011-07-08 10:52:10,075 [main] TRACE
>> >>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>> >>> 2011-07-08 10:52:10,081 [main] DEBUG
>> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>> >>> Executing
>> >>> callback on JMS Session: ActiveMQSession
>> >>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
>> >>> 2011-07-08 10:52:10,082 [main] TRACE
>> >>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>> >>> 2011-07-08 10:52:10,082 [main] DEBUG
>> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>> >>> Sending
>> >>> JMS message to: queue://dest with message: ActiveMQTextMessage
> {commandId
>> >>> =
>> >>> 0, responseRequired = false, messageId = null, originalDestination =
>> >>> null,
>> >>> originalTransactionId = null, producerId = null, destination = null,
>> >>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>> >>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>> >>> ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
> queue://replyQueue,
>> >>> persistent = false, type = null, priority = 0, groupID = null,
>> >>> groupSequence
>> >>> = 0, targetConsumerId = null, compressed = false, userID = null,
> content
>> >>> =
>> >>> null, marshalledProperties = null, dataStructure = null,
>> >>> redeliveryCounter =
>> >>> 0, size = 0, properties = null, readOnlyProperties = false,
> readOnlyBody
>> >>> =
>> >>> false, droppable = false, text = abc}
>> >>> 2011-07-08 10:52:10,083 [main] DEBUG
>> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> Sent
>> >>> JMS
>> >>> message to: queue://dest with message: ActiveMQTextMessage {commandId
> =
>> >>> 0,
>> >>> responseRequired = false, messageId =
>> >>> ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination
> =
>> >>> null, originalTransactionId = null, producerId = null, destination =
>> >>> queue://dest, transactionId = null, expiration = 1310158350082,
> timestamp
>> >>> =
>> >>> 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>> >>> correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
>> >>> queue://replyQueue, persistent = true, type = null, priority = 4,
> groupID
>> >>> =
>> >>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>> >>> userID
>> >>> = null, content = null, marshalledProperties = null, dataStructure =
>> >>> null,
>> >>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties
> =
>> >>> false, readOnlyBody = false, droppable = false, text = abc}
>> >>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>> >>> JmsReplyManagerTimeoutChecker[dest] TRACE
>> >>> org.apache.camel.component.jms.reply.CorrelationMap - Running purge
> task
>> >>> to
>> >>> see if any entries has been timed out
>> >>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>> >>> JmsReplyManagerTimeoutChecker[dest] TRACE
>> >>> org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in
> the
>> >>> timeout map
>> >>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG
>> >>>
>> >>>
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>> >>> - Consumer [ActiveMQMessageConsumer {
>> >>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }]
> of
>> >>> session [ActiveMQSession
>> >>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did
> not
>> >>> receive a message
>> >>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
>> >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
> Using
>> >>>
>> >>>
> MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
>> >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>> >>>
>> >>>
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>> >>> - Received message of type [class
>> >>> org.apache.activemq.command.ActiveMQTextMessage] from consumer
>> >>> [ActiveMQMessageConsumer {
>> >>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }]
> of
>> >>> session [ActiveMQSession
>> >>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
>> >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>> >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
>> >>> Received
>> >>> reply message with correlationID:
>> >>> ID-rsi-eng-newsham-61454-1310158327407-0-4
>> >>> ->  ActiveMQTextMessage {commandId = 9, responseRequired = true,
>> >>> messageId =
>> >>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination
> =
>> >>> null, originalTransactionId = null, producerId =
>> >>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>> >>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>> >>> 1310158330085, arrival = 0, brokerInTime = 1310158330085,
> brokerOutTime =
>> >>> 1310158331077, correlationId =
>> >>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
>> >>> replyTo = null, persistent = true, type = null, priority = 4, groupID
> =
>> >>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>> >>> userID
>> >>> = null, content = null, marshalledProperties = null, dataStructure =
>> >>> null,
>> >>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties
> =
>> >>> true, readOnlyBody = true, droppable = false, text = reply}
>> >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
>> >>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
>> >>> TextMessage
>> >>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired
> =
>> >>> true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
>> >>> originalDestination = null, originalTransactionId = null, producerId =
>> >>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>> >>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>> >>> 1310158330085, arrival = 0, brokerInTime = 1310158330085,
> brokerOutTime =
>> >>> 1310158331077, correlationId =
>> >>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
>> >>> replyTo = null, persistent = true, type = null, priority = 4, groupID
> =
>> >>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>> >>> userID
>> >>> = null, content = null, marshalledProperties = null, dataStructure =
>> >>> null,
>> >>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties
> =
>> >>> true, readOnlyBody = true, droppable = false, text = reply}
>> >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>> >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
> Reply
>> >>> received. Setting reply as OUT message: reply
>> >>> received reply in: 1.004 s
>> >>>
>> >>> Thanks,
>> >>> Jim
>> >>>
>> >>>
>> >>>
>> >>> On 7/7/2011 10:59 PM, Claus Ibsen wrote:
>> >>>>
>> >>>> Can you enable TRACE logging at org.apache.camel.component.jms and
> run
>> >>>> it for both examples.
>> >>>> To see what happens.
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jn...@referentia.com>
>> >>>>  wrote:
>> >>>>>
>> >>>>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason,
> when
>> >>>>> I
>> >>>>> specify a custom replyTo destination on the endpoint url, the time
> it
>> >>>>> takes
>> >>>>> for the producer to receive a reply increases drastically.  The
> curious
>> >>>>> thing is that the time to receive a reply is almost exactly 1
> second.
>> >>>>>  When
>> >>>>> I remove the replyTo from the url, everything's fast again.
>> >>>>>
>> >>>>> I created a very simple, stand-alone test to demonstrate what I'm
>> >>>>> seeing.
>> >>>>>  There is a server class [4] which runs an embedded instance of
>> >>>>> ActiveMQ
>> >>>>> and
>> >>>>> simply replies to messages as they arrive; and a client [3] class
> which
>> >>>>> simply sends messages to the server, and prints the elapsed time.
>  The
>> >>>>> USE_REPLY_TO symbolic constant in the client determines whether a
>> >>>>> replyTo
>> >>>>> value is added to the url or not.
>> >>>>>
>> >>>>> The client output when USE_REPLY_TO is false is shown as [1].  The
>> >>>>> client
>> >>>>> output when USE_REPLY_TO is true is shown as [2].  The code is
> pretty
>> >>>>> trivial.  Am I doing something wrong, or is this a Camel and/or
>> >>>>> ActiveMQ
>> >>>>> issue?
>> >>>>>
>> >>>>> Thanks!
>> >>>>> Jim
>> >>>>>
>> >>>>>
>> >>>>> [1] USE_REPLY_TO = false
>> >>>>>
>> >>>>> received reply in: 0.476 s
>> >>>>> received reply in: 0.006 s
>> >>>>> received reply in: 0.006 s
>> >>>>> received reply in: 0.006 s
>> >>>>> received reply in: 0.006 s
>> >>>>> ...
>> >>>>>
>> >>>>>
>> >>>>> [2] USE_REPLY_TO = true
>> >>>>>
>> >>>>> received reply in: 1.524 s
>> >>>>> received reply in: 1.002 s
>> >>>>> received reply in: 1.003 s
>> >>>>> received reply in: 1.003 s
>> >>>>> received reply in: 1.002 s
>> >>>>> ...
>> >>>>>
>> >>>>>
>> >>>>> [3] TestReplyToClient.java
>> >>>>>
>> >>>>> package test;
>> >>>>>
>> >>>>> import org.apache.activemq.ActiveMQConnectionFactory;
>> >>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> >>>>> import org.apache.camel.CamelContext;
>> >>>>> import org.apache.camel.ProducerTemplate;
>> >>>>> import org.apache.camel.impl.DefaultCamelContext;
>> >>>>>
>> >>>>> public class TestReplyToClient {
>> >>>>>
>> >>>>>  private static final boolean USE_REPLY_TO = false;
>> >>>>>
>> >>>>>  public static void main(String... args) throws Exception {
>> >>>>>    // create camel context; configure activemq component for
>> >>>>> tcp://localhost:7001
>> >>>>>    CamelContext context = new DefaultCamelContext();
>> >>>>>    ActiveMQComponent activemqComponent =
>> >>>>> ActiveMQComponent.activeMQComponent();
>> >>>>>    activemqComponent.setConnectionFactory(new
>> >>>>> ActiveMQConnectionFactory(
>> >>>>>      null, null, "tcp://localhost:7001"));
>> >>>>>    context.addComponent("activemq", activemqComponent);
>> >>>>>    context.start();
>> >>>>>
>> >>>>>    // define url to send requests to
>> >>>>>    String sendUrl = "activemq:queue:dest";
>> >>>>>    if (USE_REPLY_TO) {
>> >>>>>      sendUrl += "?replyTo=replyQueue";
>> >>>>>    }
>> >>>>>    System.err.println("sending to url: " + sendUrl);
>> >>>>>
>> >>>>>    // repeatedly send requests; measure elapsed time
>> >>>>>    ProducerTemplate template = context.createProducerTemplate();
>> >>>>>    while (true) {
>> >>>>>      long startNanos = System.nanoTime();
>> >>>>>      template.requestBody(sendUrl, "abc");
>> >>>>>      long elapsedNanos = System.nanoTime() - startNanos;
>> >>>>>      System.err.println(String.format("received reply in: %.3f s",
>> >>>>> elapsedNanos / 1000000000.0));
>> >>>>>    }
>> >>>>>  }
>> >>>>>
>> >>>>> }
>> >>>>>
>> >>>>>
>> >>>>> [4] TestReplyToServer.java
>> >>>>>
>> >>>>> package test;
>> >>>>>
>> >>>>> import org.apache.activemq.broker.BrokerService;
>> >>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> >>>>> import org.apache.camel.CamelContext;
>> >>>>> import org.apache.camel.Exchange;
>> >>>>> import org.apache.camel.Processor;
>> >>>>> import org.apache.camel.builder.RouteBuilder;
>> >>>>> import org.apache.camel.impl.DefaultCamelContext;
>> >>>>>
>> >>>>> public class TestReplyToServer {
>> >>>>>
>> >>>>>  private static final String BROKER_NAME = "thebroker";
>> >>>>>
>> >>>>>  public static void main(String... args) throws Exception {
>> >>>>>    startBroker();
>> >>>>>    startCamel();
>> >>>>>    Thread.sleep(Long.MAX_VALUE);
>> >>>>>  }
>> >>>>>
>> >>>>>  private static void startBroker() throws Exception {
>> >>>>>    BrokerService brokerService = new BrokerService();
>> >>>>>    brokerService.setBrokerName(BROKER_NAME);
>> >>>>>    brokerService.setSchedulerSupport(false);
>> >>>>>    brokerService.setPersistent(false);
>> >>>>>    brokerService.addConnector("tcp://0.0.0.0:7001");
>> >>>>>    brokerService.start();
>> >>>>>    brokerService.waitUntilStarted();
>> >>>>>  }
>> >>>>>
>> >>>>>
>> >>>>>  private static void startCamel() throws Exception {
>> >>>>>    CamelContext context = new DefaultCamelContext();
>> >>>>>
>> >>>>>    ActiveMQComponent activemqComponent =
>> >>>>> ActiveMQComponent.activeMQComponent();
>> >>>>>
>  activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>> >>>>> BROKER_NAME));
>> >>>>>    context.addComponent("activemq", activemqComponent);
>> >>>>>
>> >>>>>    final String receiveUrl = "activemq:queue:dest";
>> >>>>>    context.addRoutes(new RouteBuilder() {
>> >>>>>      @Override
>> >>>>>      public void configure() throws Exception {
>> >>>>>        from(receiveUrl).process(new Processor() {
>> >>>>>          @Override
>> >>>>>          public void process(Exchange exchange) throws Exception {
>> >>>>>            System.err.println("received request");
>> >>>>>            exchange.getOut().setBody("reply");
>> >>>>>          }
>> >>>>>        });
>> >>>>>      }
>> >>>>>    });
>> >>>>>
>> >>>>>    context.start();
>> >>>>>    System.err.println("listening on url: " + receiveUrl);
>> >>>>>  }
>> >>>>>
>> >>>>> }
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >>
>> >
>> >
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: slow reply for jms component when url contains replyTo

Posted by David Karlsen <da...@gmail.com>.
Maybe better to provide a replyto resolver strategyinterface with a few
implementations for vinning cases?
Den 12. juli 2011 07:26 skrev "Claus Ibsen" <cl...@gmail.com>
følgende:
>
> On Mon, Jul 11, 2011 at 10:12 PM, Jim Newsham <jn...@referentia.com>
wrote:
> >
> > Hi Claus,
> >
> > Thanks for opening the jira issue, and for your comments.  To answer
your
> > questions:
> >
> > 1.  We use fixed reply-to queues which are exclusive to Camel.
> > 2.  We need a fixed reply-to queue to avoid losing reply messages in
case of
> > disconnection (which would happen with temporary queues because they are
> > scoped to the lifetime of the connection).
> >
>
> I think we should add an option to the JMS component so you can
> configure that the replyTo is to be consider as exclusive.
>
> ?replyTo=bar&exclusiveReplyTo=true
>
> Something like this. Anyone got a better idea for a name of the option?
>
> Likewise I have pondered about if we should add an option to have
> convention over configuration? So for example you can configure on the
> jms component, a pattern of the reply to names.
>
> jmsComponent.setExclusiveReplyTo(true);
> jmsComponent.setReplyToPattern("${name}.reply");
>
> So in this example Camel will automatic name the reply to queues, as
> the request queue name + ".reply". So in the example above we can do
>
> from X
> to("jms:queue:bar")
> to Y
>
> So in this example since we configured the component with exclusive
> reply to, and a reply to pattern as well. Then what happens is that
> Camel will use a reply to queue named: bar.reply
>
>
> Of course you can still configure all the options on the endpoint as
> well if you like. And you can override the component settings so in
> case you want a special reply to name you can do:
>
> from X
> to("jms:queue:bar?replyTo=special")
> to Y
>
> Any thoughts?
>
>
> > Regards,
> > Jim
> >
> > On 7/9/2011 5:52 AM, Claus Ibsen wrote:
> >>
> >> Hi Jim
> >>
> >> I have created a ticket and posted some comments about the issue
> >> https://issues.apache.org/jira/browse/CAMEL-4202
> >>
> >> Are you using a fixed reply to queue that is *exclusive* to the Camel
> >> route?
> >> Or is the queue used for other purposes as well?
> >>
> >> Is there a special reason why you want to use a fixed reply to queue?
> >>
> >>
> >>
> >> On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham<jn...@referentia.com>
> >>  wrote:
> >>>
> >>> Hi Claus,
> >>>
> >>> I enabled trace logging.  I'm attaching the logs (for both client and
> >>> server; both with and without custom replyTo) as a zip file -- not
sure
> >>> if
> >>> the mailing list will filter it, we'll see.
> >>>
> >>> I see that there are 5 messages in the client log which only appear
when
> >>> a
> >>> custom replyTo is specified:  "Running purge task to see if any
entries
> >>> has
> >>> been timed out", "There are 1 in the timeout map", "did not receive a
> >>> message", etc.  Here's an excerpt from each client log, for one
exchange:
> >>>
> >>>  From log for client without replyTo:
> >>>
> >>> 2011-07-08 10:55:32,354 [main] TRACE
> >>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
> >>> 2011-07-08 10:55:32,361 [main] DEBUG
> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> >>> Executing
> >>> callback on JMS Session: ActiveMQSession
> >>> {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
> >>> 2011-07-08 10:55:32,361 [main] TRACE
> >>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
> >>> 2011-07-08 10:55:32,362 [main] DEBUG
> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> >>> Sending
> >>> JMS message to: queue://dest with message: ActiveMQTextMessage
{commandId
> >>> =
> >>> 0, responseRequired = false, messageId = null, originalDestination =
> >>> null,
> >>> originalTransactionId = null, producerId = null, destination = null,
> >>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
> >>> brokerInTime = 0, brokerOutTime = 0, correlationId =
> >>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
> >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent
=
> >>> false, type = null, priority = 0, groupID = null, groupSequence = 0,
> >>> targetConsumerId = null, compressed = false, userID = null, content =
> >>> null,
> >>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
0,
> >>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody
=
> >>> false, droppable = false, text = abc}
> >>> 2011-07-08 10:55:32,363 [main] DEBUG
> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
Sent
> >>> JMS
> >>> message to: queue://dest with message: ActiveMQTextMessage {commandId
=
> >>> 0,
> >>> responseRequired = false, messageId =
> >>> ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination
=
> >>> null, originalTransactionId = null, producerId = null, destination =
> >>> queue://dest, transactionId = null, expiration = 1310158552362,
timestamp
> >>> =
> >>> 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
> >>> correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
> >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent
=
> >>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
> >>> targetConsumerId = null, compressed = false, userID = null, content =
> >>> null,
> >>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
0,
> >>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody
=
> >>> false, droppable = false, text = abc}
> >>> 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
> >>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
> >>> Received
> >>> reply message with correlationID:
> >>> ID-rsi-eng-newsham-61472-1310158530715-0-4
> >>> ->  ActiveMQTextMessage {commandId = 9, responseRequired = true,
> >>> messageId =
> >>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination
=
> >>> null, originalTransactionId = null, producerId =
> >>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
> >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1,
transactionId
> >>> =
> >>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
> >>> brokerInTime =
> >>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
> >>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent
=
> >>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
> >>> targetConsumerId = null, compressed = false, userID = null, content =
> >>> null,
> >>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
0,
> >>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
> >>> true,
> >>> droppable = false, text = reply}
> >>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
> >>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
> >>> TextMessage
> >>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired
=
> >>> true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
> >>> originalDestination = null, originalTransactionId = null, producerId =
> >>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
> >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1,
transactionId
> >>> =
> >>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
> >>> brokerInTime =
> >>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
> >>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent
=
> >>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
> >>> targetConsumerId = null, compressed = false, userID = null, content =
> >>> null,
> >>> marshalledProperties = null, dataStructure = null, redeliveryCounter =
0,
> >>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
> >>> true,
> >>> droppable = false, text = reply}
> >>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
> >>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
Reply
> >>> received. Setting reply as OUT message: reply
> >>> received reply in: 0.015 s
> >>>
> >>>  From log for client with replyTo:
> >>>
> >>> 2011-07-08 10:52:10,075 [main] TRACE
> >>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
> >>> 2011-07-08 10:52:10,081 [main] DEBUG
> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> >>> Executing
> >>> callback on JMS Session: ActiveMQSession
> >>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
> >>> 2011-07-08 10:52:10,082 [main] TRACE
> >>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
> >>> 2011-07-08 10:52:10,082 [main] DEBUG
> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> >>> Sending
> >>> JMS message to: queue://dest with message: ActiveMQTextMessage
{commandId
> >>> =
> >>> 0, responseRequired = false, messageId = null, originalDestination =
> >>> null,
> >>> originalTransactionId = null, producerId = null, destination = null,
> >>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
> >>> brokerInTime = 0, brokerOutTime = 0, correlationId =
> >>> ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
queue://replyQueue,
> >>> persistent = false, type = null, priority = 0, groupID = null,
> >>> groupSequence
> >>> = 0, targetConsumerId = null, compressed = false, userID = null,
content
> >>> =
> >>> null, marshalledProperties = null, dataStructure = null,
> >>> redeliveryCounter =
> >>> 0, size = 0, properties = null, readOnlyProperties = false,
readOnlyBody
> >>> =
> >>> false, droppable = false, text = abc}
> >>> 2011-07-08 10:52:10,083 [main] DEBUG
> >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
Sent
> >>> JMS
> >>> message to: queue://dest with message: ActiveMQTextMessage {commandId
=
> >>> 0,
> >>> responseRequired = false, messageId =
> >>> ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination
=
> >>> null, originalTransactionId = null, producerId = null, destination =
> >>> queue://dest, transactionId = null, expiration = 1310158350082,
timestamp
> >>> =
> >>> 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
> >>> correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
> >>> queue://replyQueue, persistent = true, type = null, priority = 4,
groupID
> >>> =
> >>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
> >>> userID
> >>> = null, content = null, marshalledProperties = null, dataStructure =
> >>> null,
> >>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties
=
> >>> false, readOnlyBody = false, droppable = false, text = abc}
> >>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
> >>> JmsReplyManagerTimeoutChecker[dest] TRACE
> >>> org.apache.camel.component.jms.reply.CorrelationMap - Running purge
task
> >>> to
> >>> see if any entries has been timed out
> >>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
> >>> JmsReplyManagerTimeoutChecker[dest] TRACE
> >>> org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in
the
> >>> timeout map
> >>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG
> >>>
> >>>
org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
> >>> - Consumer [ActiveMQMessageConsumer {
> >>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }]
of
> >>> session [ActiveMQSession
> >>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did
not
> >>> receive a message
> >>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
> >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
Using
> >>>
> >>>
MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
> >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> >>>
> >>>
org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
> >>> - Received message of type [class
> >>> org.apache.activemq.command.ActiveMQTextMessage] from consumer
> >>> [ActiveMQMessageConsumer {
> >>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }]
of
> >>> session [ActiveMQSession
> >>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
> >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
> >>> Received
> >>> reply message with correlationID:
> >>> ID-rsi-eng-newsham-61454-1310158327407-0-4
> >>> ->  ActiveMQTextMessage {commandId = 9, responseRequired = true,
> >>> messageId =
> >>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination
=
> >>> null, originalTransactionId = null, producerId =
> >>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
> >>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
> >>> 1310158330085, arrival = 0, brokerInTime = 1310158330085,
brokerOutTime =
> >>> 1310158331077, correlationId =
> >>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
> >>> replyTo = null, persistent = true, type = null, priority = 4, groupID
=
> >>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
> >>> userID
> >>> = null, content = null, marshalledProperties = null, dataStructure =
> >>> null,
> >>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties
=
> >>> true, readOnlyBody = true, droppable = false, text = reply}
> >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
> >>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
> >>> TextMessage
> >>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired
=
> >>> true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
> >>> originalDestination = null, originalTransactionId = null, producerId =
> >>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
> >>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
> >>> 1310158330085, arrival = 0, brokerInTime = 1310158330085,
brokerOutTime =
> >>> 1310158331077, correlationId =
> >>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
> >>> replyTo = null, persistent = true, type = null, priority = 4, groupID
=
> >>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
> >>> userID
> >>> = null, content = null, marshalledProperties = null, dataStructure =
> >>> null,
> >>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties
=
> >>> true, readOnlyBody = true, droppable = false, text = reply}
> >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
Reply
> >>> received. Setting reply as OUT message: reply
> >>> received reply in: 1.004 s
> >>>
> >>> Thanks,
> >>> Jim
> >>>
> >>>
> >>>
> >>> On 7/7/2011 10:59 PM, Claus Ibsen wrote:
> >>>>
> >>>> Can you enable TRACE logging at org.apache.camel.component.jms and
run
> >>>> it for both examples.
> >>>> To see what happens.
> >>>>
> >>>>
> >>>>
> >>>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jn...@referentia.com>
> >>>>  wrote:
> >>>>>
> >>>>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason,
when
> >>>>> I
> >>>>> specify a custom replyTo destination on the endpoint url, the time
it
> >>>>> takes
> >>>>> for the producer to receive a reply increases drastically.  The
curious
> >>>>> thing is that the time to receive a reply is almost exactly 1
second.
> >>>>>  When
> >>>>> I remove the replyTo from the url, everything's fast again.
> >>>>>
> >>>>> I created a very simple, stand-alone test to demonstrate what I'm
> >>>>> seeing.
> >>>>>  There is a server class [4] which runs an embedded instance of
> >>>>> ActiveMQ
> >>>>> and
> >>>>> simply replies to messages as they arrive; and a client [3] class
which
> >>>>> simply sends messages to the server, and prints the elapsed time.
 The
> >>>>> USE_REPLY_TO symbolic constant in the client determines whether a
> >>>>> replyTo
> >>>>> value is added to the url or not.
> >>>>>
> >>>>> The client output when USE_REPLY_TO is false is shown as [1].  The
> >>>>> client
> >>>>> output when USE_REPLY_TO is true is shown as [2].  The code is
pretty
> >>>>> trivial.  Am I doing something wrong, or is this a Camel and/or
> >>>>> ActiveMQ
> >>>>> issue?
> >>>>>
> >>>>> Thanks!
> >>>>> Jim
> >>>>>
> >>>>>
> >>>>> [1] USE_REPLY_TO = false
> >>>>>
> >>>>> received reply in: 0.476 s
> >>>>> received reply in: 0.006 s
> >>>>> received reply in: 0.006 s
> >>>>> received reply in: 0.006 s
> >>>>> received reply in: 0.006 s
> >>>>> ...
> >>>>>
> >>>>>
> >>>>> [2] USE_REPLY_TO = true
> >>>>>
> >>>>> received reply in: 1.524 s
> >>>>> received reply in: 1.002 s
> >>>>> received reply in: 1.003 s
> >>>>> received reply in: 1.003 s
> >>>>> received reply in: 1.002 s
> >>>>> ...
> >>>>>
> >>>>>
> >>>>> [3] TestReplyToClient.java
> >>>>>
> >>>>> package test;
> >>>>>
> >>>>> import org.apache.activemq.ActiveMQConnectionFactory;
> >>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
> >>>>> import org.apache.camel.CamelContext;
> >>>>> import org.apache.camel.ProducerTemplate;
> >>>>> import org.apache.camel.impl.DefaultCamelContext;
> >>>>>
> >>>>> public class TestReplyToClient {
> >>>>>
> >>>>>  private static final boolean USE_REPLY_TO = false;
> >>>>>
> >>>>>  public static void main(String... args) throws Exception {
> >>>>>    // create camel context; configure activemq component for
> >>>>> tcp://localhost:7001
> >>>>>    CamelContext context = new DefaultCamelContext();
> >>>>>    ActiveMQComponent activemqComponent =
> >>>>> ActiveMQComponent.activeMQComponent();
> >>>>>    activemqComponent.setConnectionFactory(new
> >>>>> ActiveMQConnectionFactory(
> >>>>>      null, null, "tcp://localhost:7001"));
> >>>>>    context.addComponent("activemq", activemqComponent);
> >>>>>    context.start();
> >>>>>
> >>>>>    // define url to send requests to
> >>>>>    String sendUrl = "activemq:queue:dest";
> >>>>>    if (USE_REPLY_TO) {
> >>>>>      sendUrl += "?replyTo=replyQueue";
> >>>>>    }
> >>>>>    System.err.println("sending to url: " + sendUrl);
> >>>>>
> >>>>>    // repeatedly send requests; measure elapsed time
> >>>>>    ProducerTemplate template = context.createProducerTemplate();
> >>>>>    while (true) {
> >>>>>      long startNanos = System.nanoTime();
> >>>>>      template.requestBody(sendUrl, "abc");
> >>>>>      long elapsedNanos = System.nanoTime() - startNanos;
> >>>>>      System.err.println(String.format("received reply in: %.3f s",
> >>>>> elapsedNanos / 1000000000.0));
> >>>>>    }
> >>>>>  }
> >>>>>
> >>>>> }
> >>>>>
> >>>>>
> >>>>> [4] TestReplyToServer.java
> >>>>>
> >>>>> package test;
> >>>>>
> >>>>> import org.apache.activemq.broker.BrokerService;
> >>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
> >>>>> import org.apache.camel.CamelContext;
> >>>>> import org.apache.camel.Exchange;
> >>>>> import org.apache.camel.Processor;
> >>>>> import org.apache.camel.builder.RouteBuilder;
> >>>>> import org.apache.camel.impl.DefaultCamelContext;
> >>>>>
> >>>>> public class TestReplyToServer {
> >>>>>
> >>>>>  private static final String BROKER_NAME = "thebroker";
> >>>>>
> >>>>>  public static void main(String... args) throws Exception {
> >>>>>    startBroker();
> >>>>>    startCamel();
> >>>>>    Thread.sleep(Long.MAX_VALUE);
> >>>>>  }
> >>>>>
> >>>>>  private static void startBroker() throws Exception {
> >>>>>    BrokerService brokerService = new BrokerService();
> >>>>>    brokerService.setBrokerName(BROKER_NAME);
> >>>>>    brokerService.setSchedulerSupport(false);
> >>>>>    brokerService.setPersistent(false);
> >>>>>    brokerService.addConnector("tcp://0.0.0.0:7001");
> >>>>>    brokerService.start();
> >>>>>    brokerService.waitUntilStarted();
> >>>>>  }
> >>>>>
> >>>>>
> >>>>>  private static void startCamel() throws Exception {
> >>>>>    CamelContext context = new DefaultCamelContext();
> >>>>>
> >>>>>    ActiveMQComponent activemqComponent =
> >>>>> ActiveMQComponent.activeMQComponent();
> >>>>>
 activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
> >>>>> BROKER_NAME));
> >>>>>    context.addComponent("activemq", activemqComponent);
> >>>>>
> >>>>>    final String receiveUrl = "activemq:queue:dest";
> >>>>>    context.addRoutes(new RouteBuilder() {
> >>>>>      @Override
> >>>>>      public void configure() throws Exception {
> >>>>>        from(receiveUrl).process(new Processor() {
> >>>>>          @Override
> >>>>>          public void process(Exchange exchange) throws Exception {
> >>>>>            System.err.println("received request");
> >>>>>            exchange.getOut().setBody("reply");
> >>>>>          }
> >>>>>        });
> >>>>>      }
> >>>>>    });
> >>>>>
> >>>>>    context.start();
> >>>>>    System.err.println("listening on url: " + receiveUrl);
> >>>>>  }
> >>>>>
> >>>>> }
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/

Re: slow reply for jms component when url contains replyTo

Posted by Claus Ibsen <cl...@gmail.com>.
On Mon, Jul 11, 2011 at 10:12 PM, Jim Newsham <jn...@referentia.com> wrote:
>
> Hi Claus,
>
> Thanks for opening the jira issue, and for your comments.  To answer your
> questions:
>
> 1.  We use fixed reply-to queues which are exclusive to Camel.
> 2.  We need a fixed reply-to queue to avoid losing reply messages in case of
> disconnection (which would happen with temporary queues because they are
> scoped to the lifetime of the connection).
>

I think we should add an option to the JMS component so you can
configure that the replyTo is to be consider as exclusive.

?replyTo=bar&exclusiveReplyTo=true

Something like this. Anyone got a better idea for a name of the option?

Likewise I have pondered about if we should add an option to have
convention over configuration? So for example you can configure on the
jms component, a pattern of the reply to names.

jmsComponent.setExclusiveReplyTo(true);
jmsComponent.setReplyToPattern("${name}.reply");

So in this example Camel will automatic name the reply to queues, as
the request queue name + ".reply". So in the example above we can do

from X
to("jms:queue:bar")
to Y

So in this example since we configured the component with exclusive
reply to, and a reply to pattern as well. Then what happens is that
Camel will use a reply to queue named: bar.reply


Of course you can still configure all the options on the endpoint as
well if you like. And you can override the component settings so in
case you want a special reply to name you can do:

from X
to("jms:queue:bar?replyTo=special")
to Y

Any thoughts?


> Regards,
> Jim
>
> On 7/9/2011 5:52 AM, Claus Ibsen wrote:
>>
>> Hi Jim
>>
>> I have created a ticket and posted some comments about the issue
>> https://issues.apache.org/jira/browse/CAMEL-4202
>>
>> Are you using a fixed reply to queue that is *exclusive* to the Camel
>> route?
>> Or is the queue used for other purposes as well?
>>
>> Is there a special reason why you want to use a fixed reply to queue?
>>
>>
>>
>> On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham<jn...@referentia.com>
>>  wrote:
>>>
>>> Hi Claus,
>>>
>>> I enabled trace logging.  I'm attaching the logs (for both client and
>>> server; both with and without custom replyTo) as a zip file -- not sure
>>> if
>>> the mailing list will filter it, we'll see.
>>>
>>> I see that there are 5 messages in the client log which only appear when
>>> a
>>> custom replyTo is specified:  "Running purge task to see if any entries
>>> has
>>> been timed out", "There are 1 in the timeout map", "did not receive a
>>> message", etc.  Here's an excerpt from each client log, for one exchange:
>>>
>>>  From log for client without replyTo:
>>>
>>> 2011-07-08 10:55:32,354 [main] TRACE
>>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>>> 2011-07-08 10:55:32,361 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>> Executing
>>> callback on JMS Session: ActiveMQSession
>>> {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
>>> 2011-07-08 10:55:32,361 [main] TRACE
>>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>>> 2011-07-08 10:55:32,362 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>> Sending
>>> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
>>> =
>>> 0, responseRequired = false, messageId = null, originalDestination =
>>> null,
>>> originalTransactionId = null, producerId = null, destination = null,
>>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
>>> false, type = null, priority = 0, groupID = null, groupSequence = 0,
>>> targetConsumerId = null, compressed = false, userID = null, content =
>>> null,
>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>>> false, droppable = false, text = abc}
>>> 2011-07-08 10:55:32,363 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
>>> JMS
>>> message to: queue://dest with message: ActiveMQTextMessage {commandId =
>>> 0,
>>> responseRequired = false, messageId =
>>> ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination =
>>> null, originalTransactionId = null, producerId = null, destination =
>>> queue://dest, transactionId = null, expiration = 1310158552362, timestamp
>>> =
>>> 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>>> correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>> targetConsumerId = null, compressed = false, userID = null, content =
>>> null,
>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>>> false, droppable = false, text = abc}
>>> 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
>>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
>>> Received
>>> reply message with correlationID:
>>> ID-rsi-eng-newsham-61472-1310158530715-0-4
>>> ->  ActiveMQTextMessage {commandId = 9, responseRequired = true,
>>> messageId =
>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination =
>>> null, originalTransactionId = null, producerId =
>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
>>> =
>>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
>>> brokerInTime =
>>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>> targetConsumerId = null, compressed = false, userID = null, content =
>>> null,
>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
>>> true,
>>> droppable = false, text = reply}
>>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
>>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
>>> TextMessage
>>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
>>> true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
>>> originalDestination = null, originalTransactionId = null, producerId =
>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
>>> =
>>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
>>> brokerInTime =
>>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>> targetConsumerId = null, compressed = false, userID = null, content =
>>> null,
>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
>>> true,
>>> droppable = false, text = reply}
>>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
>>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply
>>> received. Setting reply as OUT message: reply
>>> received reply in: 0.015 s
>>>
>>>  From log for client with replyTo:
>>>
>>> 2011-07-08 10:52:10,075 [main] TRACE
>>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>>> 2011-07-08 10:52:10,081 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>> Executing
>>> callback on JMS Session: ActiveMQSession
>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
>>> 2011-07-08 10:52:10,082 [main] TRACE
>>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>>> 2011-07-08 10:52:10,082 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>> Sending
>>> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
>>> =
>>> 0, responseRequired = false, messageId = null, originalDestination =
>>> null,
>>> originalTransactionId = null, producerId = null, destination = null,
>>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>>> ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = queue://replyQueue,
>>> persistent = false, type = null, priority = 0, groupID = null,
>>> groupSequence
>>> = 0, targetConsumerId = null, compressed = false, userID = null, content
>>> =
>>> null, marshalledProperties = null, dataStructure = null,
>>> redeliveryCounter =
>>> 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody
>>> =
>>> false, droppable = false, text = abc}
>>> 2011-07-08 10:52:10,083 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
>>> JMS
>>> message to: queue://dest with message: ActiveMQTextMessage {commandId =
>>> 0,
>>> responseRequired = false, messageId =
>>> ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination =
>>> null, originalTransactionId = null, producerId = null, destination =
>>> queue://dest, transactionId = null, expiration = 1310158350082, timestamp
>>> =
>>> 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>>> correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
>>> queue://replyQueue, persistent = true, type = null, priority = 4, groupID
>>> =
>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>> userID
>>> = null, content = null, marshalledProperties = null, dataStructure =
>>> null,
>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>>> false, readOnlyBody = false, droppable = false, text = abc}
>>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>>> JmsReplyManagerTimeoutChecker[dest] TRACE
>>> org.apache.camel.component.jms.reply.CorrelationMap - Running purge task
>>> to
>>> see if any entries has been timed out
>>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>>> JmsReplyManagerTimeoutChecker[dest] TRACE
>>> org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in the
>>> timeout map
>>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG
>>>
>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>>> - Consumer [ActiveMQMessageConsumer {
>>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }] of
>>> session [ActiveMQSession
>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not
>>> receive a message
>>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Using
>>>
>>> MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>>
>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>>> - Received message of type [class
>>> org.apache.activemq.command.ActiveMQTextMessage] from consumer
>>> [ActiveMQMessageConsumer {
>>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }] of
>>> session [ActiveMQSession
>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
>>> Received
>>> reply message with correlationID:
>>> ID-rsi-eng-newsham-61454-1310158327407-0-4
>>> ->  ActiveMQTextMessage {commandId = 9, responseRequired = true,
>>> messageId =
>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination =
>>> null, originalTransactionId = null, producerId =
>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
>>> 1310158331077, correlationId =
>>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
>>> replyTo = null, persistent = true, type = null, priority = 4, groupID =
>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>> userID
>>> = null, content = null, marshalledProperties = null, dataStructure =
>>> null,
>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>>> true, readOnlyBody = true, droppable = false, text = reply}
>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
>>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
>>> TextMessage
>>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
>>> true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
>>> originalDestination = null, originalTransactionId = null, producerId =
>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
>>> 1310158331077, correlationId =
>>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
>>> replyTo = null, persistent = true, type = null, priority = 4, groupID =
>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>> userID
>>> = null, content = null, marshalledProperties = null, dataStructure =
>>> null,
>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>>> true, readOnlyBody = true, droppable = false, text = reply}
>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Reply
>>> received. Setting reply as OUT message: reply
>>> received reply in: 1.004 s
>>>
>>> Thanks,
>>> Jim
>>>
>>>
>>>
>>> On 7/7/2011 10:59 PM, Claus Ibsen wrote:
>>>>
>>>> Can you enable TRACE logging at org.apache.camel.component.jms and run
>>>> it for both examples.
>>>> To see what happens.
>>>>
>>>>
>>>>
>>>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jn...@referentia.com>
>>>>  wrote:
>>>>>
>>>>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when
>>>>> I
>>>>> specify a custom replyTo destination on the endpoint url, the time it
>>>>> takes
>>>>> for the producer to receive a reply increases drastically.  The curious
>>>>> thing is that the time to receive a reply is almost exactly 1 second.
>>>>>  When
>>>>> I remove the replyTo from the url, everything's fast again.
>>>>>
>>>>> I created a very simple, stand-alone test to demonstrate what I'm
>>>>> seeing.
>>>>>  There is a server class [4] which runs an embedded instance of
>>>>> ActiveMQ
>>>>> and
>>>>> simply replies to messages as they arrive; and a client [3] class which
>>>>> simply sends messages to the server, and prints the elapsed time.  The
>>>>> USE_REPLY_TO symbolic constant in the client determines whether a
>>>>> replyTo
>>>>> value is added to the url or not.
>>>>>
>>>>> The client output when USE_REPLY_TO is false is shown as [1].  The
>>>>> client
>>>>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>>>>> trivial.  Am I doing something wrong, or is this a Camel and/or
>>>>> ActiveMQ
>>>>> issue?
>>>>>
>>>>> Thanks!
>>>>> Jim
>>>>>
>>>>>
>>>>> [1] USE_REPLY_TO = false
>>>>>
>>>>> received reply in: 0.476 s
>>>>> received reply in: 0.006 s
>>>>> received reply in: 0.006 s
>>>>> received reply in: 0.006 s
>>>>> received reply in: 0.006 s
>>>>> ...
>>>>>
>>>>>
>>>>> [2] USE_REPLY_TO = true
>>>>>
>>>>> received reply in: 1.524 s
>>>>> received reply in: 1.002 s
>>>>> received reply in: 1.003 s
>>>>> received reply in: 1.003 s
>>>>> received reply in: 1.002 s
>>>>> ...
>>>>>
>>>>>
>>>>> [3] TestReplyToClient.java
>>>>>
>>>>> package test;
>>>>>
>>>>> import org.apache.activemq.ActiveMQConnectionFactory;
>>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>>>> import org.apache.camel.CamelContext;
>>>>> import org.apache.camel.ProducerTemplate;
>>>>> import org.apache.camel.impl.DefaultCamelContext;
>>>>>
>>>>> public class TestReplyToClient {
>>>>>
>>>>>  private static final boolean USE_REPLY_TO = false;
>>>>>
>>>>>  public static void main(String... args) throws Exception {
>>>>>    // create camel context; configure activemq component for
>>>>> tcp://localhost:7001
>>>>>    CamelContext context = new DefaultCamelContext();
>>>>>    ActiveMQComponent activemqComponent =
>>>>> ActiveMQComponent.activeMQComponent();
>>>>>    activemqComponent.setConnectionFactory(new
>>>>> ActiveMQConnectionFactory(
>>>>>      null, null, "tcp://localhost:7001"));
>>>>>    context.addComponent("activemq", activemqComponent);
>>>>>    context.start();
>>>>>
>>>>>    // define url to send requests to
>>>>>    String sendUrl = "activemq:queue:dest";
>>>>>    if (USE_REPLY_TO) {
>>>>>      sendUrl += "?replyTo=replyQueue";
>>>>>    }
>>>>>    System.err.println("sending to url: " + sendUrl);
>>>>>
>>>>>    // repeatedly send requests; measure elapsed time
>>>>>    ProducerTemplate template = context.createProducerTemplate();
>>>>>    while (true) {
>>>>>      long startNanos = System.nanoTime();
>>>>>      template.requestBody(sendUrl, "abc");
>>>>>      long elapsedNanos = System.nanoTime() - startNanos;
>>>>>      System.err.println(String.format("received reply in: %.3f s",
>>>>> elapsedNanos / 1000000000.0));
>>>>>    }
>>>>>  }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> [4] TestReplyToServer.java
>>>>>
>>>>> package test;
>>>>>
>>>>> import org.apache.activemq.broker.BrokerService;
>>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>>>> import org.apache.camel.CamelContext;
>>>>> import org.apache.camel.Exchange;
>>>>> import org.apache.camel.Processor;
>>>>> import org.apache.camel.builder.RouteBuilder;
>>>>> import org.apache.camel.impl.DefaultCamelContext;
>>>>>
>>>>> public class TestReplyToServer {
>>>>>
>>>>>  private static final String BROKER_NAME = "thebroker";
>>>>>
>>>>>  public static void main(String... args) throws Exception {
>>>>>    startBroker();
>>>>>    startCamel();
>>>>>    Thread.sleep(Long.MAX_VALUE);
>>>>>  }
>>>>>
>>>>>  private static void startBroker() throws Exception {
>>>>>    BrokerService brokerService = new BrokerService();
>>>>>    brokerService.setBrokerName(BROKER_NAME);
>>>>>    brokerService.setSchedulerSupport(false);
>>>>>    brokerService.setPersistent(false);
>>>>>    brokerService.addConnector("tcp://0.0.0.0:7001");
>>>>>    brokerService.start();
>>>>>    brokerService.waitUntilStarted();
>>>>>  }
>>>>>
>>>>>
>>>>>  private static void startCamel() throws Exception {
>>>>>    CamelContext context = new DefaultCamelContext();
>>>>>
>>>>>    ActiveMQComponent activemqComponent =
>>>>> ActiveMQComponent.activeMQComponent();
>>>>>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>>>>> BROKER_NAME));
>>>>>    context.addComponent("activemq", activemqComponent);
>>>>>
>>>>>    final String receiveUrl = "activemq:queue:dest";
>>>>>    context.addRoutes(new RouteBuilder() {
>>>>>      @Override
>>>>>      public void configure() throws Exception {
>>>>>        from(receiveUrl).process(new Processor() {
>>>>>          @Override
>>>>>          public void process(Exchange exchange) throws Exception {
>>>>>            System.err.println("received request");
>>>>>            exchange.getOut().setBody("reply");
>>>>>          }
>>>>>        });
>>>>>      }
>>>>>    });
>>>>>
>>>>>    context.start();
>>>>>    System.err.println("listening on url: " + receiveUrl);
>>>>>  }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: slow reply for jms component when url contains replyTo

Posted by Jim Newsham <jn...@referentia.com>.
Hi Claus,

Thanks for opening the jira issue, and for your comments.  To answer 
your questions:

1.  We use fixed reply-to queues which are exclusive to Camel.
2.  We need a fixed reply-to queue to avoid losing reply messages in 
case of disconnection (which would happen with temporary queues because 
they are scoped to the lifetime of the connection).

Regards,
Jim

On 7/9/2011 5:52 AM, Claus Ibsen wrote:
> Hi Jim
>
> I have created a ticket and posted some comments about the issue
> https://issues.apache.org/jira/browse/CAMEL-4202
>
> Are you using a fixed reply to queue that is *exclusive* to the Camel route?
> Or is the queue used for other purposes as well?
>
> Is there a special reason why you want to use a fixed reply to queue?
>
>
>
> On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham<jn...@referentia.com>  wrote:
>> Hi Claus,
>>
>> I enabled trace logging.  I'm attaching the logs (for both client and
>> server; both with and without custom replyTo) as a zip file -- not sure if
>> the mailing list will filter it, we'll see.
>>
>> I see that there are 5 messages in the client log which only appear when a
>> custom replyTo is specified:  "Running purge task to see if any entries has
>> been timed out", "There are 1 in the timeout map", "did not receive a
>> message", etc.  Here's an excerpt from each client log, for one exchange:
>>
>>  From log for client without replyTo:
>>
>> 2011-07-08 10:55:32,354 [main] TRACE
>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>> 2011-07-08 10:55:32,361 [main] DEBUG
>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Executing
>> callback on JMS Session: ActiveMQSession
>> {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
>> 2011-07-08 10:55:32,361 [main] TRACE
>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>> 2011-07-08 10:55:32,362 [main] DEBUG
>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sending
>> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId =
>> 0, responseRequired = false, messageId = null, originalDestination = null,
>> originalTransactionId = null, producerId = null, destination = null,
>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
>> false, type = null, priority = 0, groupID = null, groupSequence = 0,
>> targetConsumerId = null, compressed = false, userID = null, content = null,
>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>> false, droppable = false, text = abc}
>> 2011-07-08 10:55:32,363 [main] DEBUG
>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent JMS
>> message to: queue://dest with message: ActiveMQTextMessage {commandId = 0,
>> responseRequired = false, messageId =
>> ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination =
>> null, originalTransactionId = null, producerId = null, destination =
>> queue://dest, transactionId = null, expiration = 1310158552362, timestamp =
>> 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>> correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>> targetConsumerId = null, compressed = false, userID = null, content = null,
>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>> false, droppable = false, text = abc}
>> 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Received
>> reply message with correlationID: ID-rsi-eng-newsham-61472-1310158530715-0-4
>> ->  ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId =
>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination =
>> null, originalTransactionId = null, producerId =
>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId =
>> null, expiration = 0, timestamp = 1310158532367, arrival = 0, brokerInTime =
>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>> targetConsumerId = null, compressed = false, userID = null, content = null,
>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true,
>> droppable = false, text = reply}
>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
>> org.apache.camel.component.jms.JmsBinding - Extracting body as a TextMessage
>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
>> true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
>> originalDestination = null, originalTransactionId = null, producerId =
>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId =
>> null, expiration = 0, timestamp = 1310158532367, arrival = 0, brokerInTime =
>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>> targetConsumerId = null, compressed = false, userID = null, content = null,
>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true,
>> droppable = false, text = reply}
>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply
>> received. Setting reply as OUT message: reply
>> received reply in: 0.015 s
>>
>>  From log for client with replyTo:
>>
>> 2011-07-08 10:52:10,075 [main] TRACE
>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>> 2011-07-08 10:52:10,081 [main] DEBUG
>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Executing
>> callback on JMS Session: ActiveMQSession
>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
>> 2011-07-08 10:52:10,082 [main] TRACE
>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>> 2011-07-08 10:52:10,082 [main] DEBUG
>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sending
>> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId =
>> 0, responseRequired = false, messageId = null, originalDestination = null,
>> originalTransactionId = null, producerId = null, destination = null,
>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>> ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = queue://replyQueue,
>> persistent = false, type = null, priority = 0, groupID = null, groupSequence
>> = 0, targetConsumerId = null, compressed = false, userID = null, content =
>> null, marshalledProperties = null, dataStructure = null, redeliveryCounter =
>> 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>> false, droppable = false, text = abc}
>> 2011-07-08 10:52:10,083 [main] DEBUG
>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent JMS
>> message to: queue://dest with message: ActiveMQTextMessage {commandId = 0,
>> responseRequired = false, messageId =
>> ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination =
>> null, originalTransactionId = null, producerId = null, destination =
>> queue://dest, transactionId = null, expiration = 1310158350082, timestamp =
>> 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>> correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
>> queue://replyQueue, persistent = true, type = null, priority = 4, groupID =
>> null, groupSequence = 0, targetConsumerId = null, compressed = false, userID
>> = null, content = null, marshalledProperties = null, dataStructure = null,
>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>> false, readOnlyBody = false, droppable = false, text = abc}
>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>> JmsReplyManagerTimeoutChecker[dest] TRACE
>> org.apache.camel.component.jms.reply.CorrelationMap - Running purge task to
>> see if any entries has been timed out
>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>> JmsReplyManagerTimeoutChecker[dest] TRACE
>> org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in the
>> timeout map
>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG
>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>> - Consumer [ActiveMQMessageConsumer {
>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }] of
>> session [ActiveMQSession
>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not
>> receive a message
>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Using
>> MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>> - Received message of type [class
>> org.apache.activemq.command.ActiveMQTextMessage] from consumer
>> [ActiveMQMessageConsumer {
>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }] of
>> session [ActiveMQSession
>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Received
>> reply message with correlationID: ID-rsi-eng-newsham-61454-1310158327407-0-4
>> ->  ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId =
>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination =
>> null, originalTransactionId = null, producerId =
>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
>> 1310158331077, correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4,
>> replyTo = null, persistent = true, type = null, priority = 4, groupID =
>> null, groupSequence = 0, targetConsumerId = null, compressed = false, userID
>> = null, content = null, marshalledProperties = null, dataStructure = null,
>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>> true, readOnlyBody = true, droppable = false, text = reply}
>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
>> org.apache.camel.component.jms.JmsBinding - Extracting body as a TextMessage
>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
>> true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
>> originalDestination = null, originalTransactionId = null, producerId =
>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
>> 1310158331077, correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4,
>> replyTo = null, persistent = true, type = null, priority = 4, groupID =
>> null, groupSequence = 0, targetConsumerId = null, compressed = false, userID
>> = null, content = null, marshalledProperties = null, dataStructure = null,
>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>> true, readOnlyBody = true, droppable = false, text = reply}
>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Reply
>> received. Setting reply as OUT message: reply
>> received reply in: 1.004 s
>>
>> Thanks,
>> Jim
>>
>>
>>
>> On 7/7/2011 10:59 PM, Claus Ibsen wrote:
>>> Can you enable TRACE logging at org.apache.camel.component.jms and run
>>> it for both examples.
>>> To see what happens.
>>>
>>>
>>>
>>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jn...@referentia.com>
>>>   wrote:
>>>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
>>>> specify a custom replyTo destination on the endpoint url, the time it
>>>> takes
>>>> for the producer to receive a reply increases drastically.  The curious
>>>> thing is that the time to receive a reply is almost exactly 1 second.
>>>>   When
>>>> I remove the replyTo from the url, everything's fast again.
>>>>
>>>> I created a very simple, stand-alone test to demonstrate what I'm seeing.
>>>>   There is a server class [4] which runs an embedded instance of ActiveMQ
>>>> and
>>>> simply replies to messages as they arrive; and a client [3] class which
>>>> simply sends messages to the server, and prints the elapsed time.  The
>>>> USE_REPLY_TO symbolic constant in the client determines whether a replyTo
>>>> value is added to the url or not.
>>>>
>>>> The client output when USE_REPLY_TO is false is shown as [1].  The client
>>>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>>>> trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
>>>> issue?
>>>>
>>>> Thanks!
>>>> Jim
>>>>
>>>>
>>>> [1] USE_REPLY_TO = false
>>>>
>>>> received reply in: 0.476 s
>>>> received reply in: 0.006 s
>>>> received reply in: 0.006 s
>>>> received reply in: 0.006 s
>>>> received reply in: 0.006 s
>>>> ...
>>>>
>>>>
>>>> [2] USE_REPLY_TO = true
>>>>
>>>> received reply in: 1.524 s
>>>> received reply in: 1.002 s
>>>> received reply in: 1.003 s
>>>> received reply in: 1.003 s
>>>> received reply in: 1.002 s
>>>> ...
>>>>
>>>>
>>>> [3] TestReplyToClient.java
>>>>
>>>> package test;
>>>>
>>>> import org.apache.activemq.ActiveMQConnectionFactory;
>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>>> import org.apache.camel.CamelContext;
>>>> import org.apache.camel.ProducerTemplate;
>>>> import org.apache.camel.impl.DefaultCamelContext;
>>>>
>>>> public class TestReplyToClient {
>>>>
>>>>   private static final boolean USE_REPLY_TO = false;
>>>>
>>>>   public static void main(String... args) throws Exception {
>>>>     // create camel context; configure activemq component for
>>>> tcp://localhost:7001
>>>>     CamelContext context = new DefaultCamelContext();
>>>>     ActiveMQComponent activemqComponent =
>>>> ActiveMQComponent.activeMQComponent();
>>>>     activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>>>>       null, null, "tcp://localhost:7001"));
>>>>     context.addComponent("activemq", activemqComponent);
>>>>     context.start();
>>>>
>>>>     // define url to send requests to
>>>>     String sendUrl = "activemq:queue:dest";
>>>>     if (USE_REPLY_TO) {
>>>>       sendUrl += "?replyTo=replyQueue";
>>>>     }
>>>>     System.err.println("sending to url: " + sendUrl);
>>>>
>>>>     // repeatedly send requests; measure elapsed time
>>>>     ProducerTemplate template = context.createProducerTemplate();
>>>>     while (true) {
>>>>       long startNanos = System.nanoTime();
>>>>       template.requestBody(sendUrl, "abc");
>>>>       long elapsedNanos = System.nanoTime() - startNanos;
>>>>       System.err.println(String.format("received reply in: %.3f s",
>>>> elapsedNanos / 1000000000.0));
>>>>     }
>>>>   }
>>>>
>>>> }
>>>>
>>>>
>>>> [4] TestReplyToServer.java
>>>>
>>>> package test;
>>>>
>>>> import org.apache.activemq.broker.BrokerService;
>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>>> import org.apache.camel.CamelContext;
>>>> import org.apache.camel.Exchange;
>>>> import org.apache.camel.Processor;
>>>> import org.apache.camel.builder.RouteBuilder;
>>>> import org.apache.camel.impl.DefaultCamelContext;
>>>>
>>>> public class TestReplyToServer {
>>>>
>>>>   private static final String BROKER_NAME = "thebroker";
>>>>
>>>>   public static void main(String... args) throws Exception {
>>>>     startBroker();
>>>>     startCamel();
>>>>     Thread.sleep(Long.MAX_VALUE);
>>>>   }
>>>>
>>>>   private static void startBroker() throws Exception {
>>>>     BrokerService brokerService = new BrokerService();
>>>>     brokerService.setBrokerName(BROKER_NAME);
>>>>     brokerService.setSchedulerSupport(false);
>>>>     brokerService.setPersistent(false);
>>>>     brokerService.addConnector("tcp://0.0.0.0:7001");
>>>>     brokerService.start();
>>>>     brokerService.waitUntilStarted();
>>>>   }
>>>>
>>>>
>>>>   private static void startCamel() throws Exception {
>>>>     CamelContext context = new DefaultCamelContext();
>>>>
>>>>     ActiveMQComponent activemqComponent =
>>>> ActiveMQComponent.activeMQComponent();
>>>>     activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>>>> BROKER_NAME));
>>>>     context.addComponent("activemq", activemqComponent);
>>>>
>>>>     final String receiveUrl = "activemq:queue:dest";
>>>>     context.addRoutes(new RouteBuilder() {
>>>>       @Override
>>>>       public void configure() throws Exception {
>>>>         from(receiveUrl).process(new Processor() {
>>>>           @Override
>>>>           public void process(Exchange exchange) throws Exception {
>>>>             System.err.println("received request");
>>>>             exchange.getOut().setBody("reply");
>>>>           }
>>>>         });
>>>>       }
>>>>     });
>>>>
>>>>     context.start();
>>>>     System.err.println("listening on url: " + receiveUrl);
>>>>   }
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>
>


Re: slow reply for jms component when url contains replyTo

Posted by Claus Ibsen <cl...@gmail.com>.
Hi Jim

I have created a ticket and posted some comments about the issue
https://issues.apache.org/jira/browse/CAMEL-4202

Are you using a fixed reply to queue that is *exclusive* to the Camel route?
Or is the queue used for other purposes as well?

Is there a special reason why you want to use a fixed reply to queue?



On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham <jn...@referentia.com> wrote:
>
> Hi Claus,
>
> I enabled trace logging.  I'm attaching the logs (for both client and
> server; both with and without custom replyTo) as a zip file -- not sure if
> the mailing list will filter it, we'll see.
>
> I see that there are 5 messages in the client log which only appear when a
> custom replyTo is specified:  "Running purge task to see if any entries has
> been timed out", "There are 1 in the timeout map", "did not receive a
> message", etc.  Here's an excerpt from each client log, for one exchange:
>
> From log for client without replyTo:
>
> 2011-07-08 10:55:32,354 [main] TRACE
> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
> 2011-07-08 10:55:32,361 [main] DEBUG
> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Executing
> callback on JMS Session: ActiveMQSession
> {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
> 2011-07-08 10:55:32,361 [main] TRACE
> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
> 2011-07-08 10:55:32,362 [main] DEBUG
> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sending
> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId =
> 0, responseRequired = false, messageId = null, originalDestination = null,
> originalTransactionId = null, producerId = null, destination = null,
> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
> brokerInTime = 0, brokerOutTime = 0, correlationId =
> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
> false, type = null, priority = 0, groupID = null, groupSequence = 0,
> targetConsumerId = null, compressed = false, userID = null, content = null,
> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
> false, droppable = false, text = abc}
> 2011-07-08 10:55:32,363 [main] DEBUG
> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent JMS
> message to: queue://dest with message: ActiveMQTextMessage {commandId = 0,
> responseRequired = false, messageId =
> ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination =
> null, originalTransactionId = null, producerId = null, destination =
> queue://dest, transactionId = null, expiration = 1310158552362, timestamp =
> 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
> correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
> true, type = null, priority = 4, groupID = null, groupSequence = 0,
> targetConsumerId = null, compressed = false, userID = null, content = null,
> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
> false, droppable = false, text = abc}
> 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Received
> reply message with correlationID: ID-rsi-eng-newsham-61472-1310158530715-0-4
> -> ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId =
> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination =
> null, originalTransactionId = null, producerId =
> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId =
> null, expiration = 0, timestamp = 1310158532367, arrival = 0, brokerInTime =
> 1310158532367, brokerOutTime = 1310158532367, correlationId =
> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
> true, type = null, priority = 4, groupID = null, groupSequence = 0,
> targetConsumerId = null, compressed = false, userID = null, content = null,
> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true,
> droppable = false, text = reply}
> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
> org.apache.camel.component.jms.JmsBinding - Extracting body as a TextMessage
> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
> true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
> originalDestination = null, originalTransactionId = null, producerId =
> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId =
> null, expiration = 0, timestamp = 1310158532367, arrival = 0, brokerInTime =
> 1310158532367, brokerOutTime = 1310158532367, correlationId =
> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
> true, type = null, priority = 4, groupID = null, groupSequence = 0,
> targetConsumerId = null, compressed = false, userID = null, content = null,
> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true,
> droppable = false, text = reply}
> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply
> received. Setting reply as OUT message: reply
> received reply in: 0.015 s
>
> From log for client with replyTo:
>
> 2011-07-08 10:52:10,075 [main] TRACE
> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
> 2011-07-08 10:52:10,081 [main] DEBUG
> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Executing
> callback on JMS Session: ActiveMQSession
> {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
> 2011-07-08 10:52:10,082 [main] TRACE
> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
> 2011-07-08 10:52:10,082 [main] DEBUG
> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sending
> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId =
> 0, responseRequired = false, messageId = null, originalDestination = null,
> originalTransactionId = null, producerId = null, destination = null,
> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
> brokerInTime = 0, brokerOutTime = 0, correlationId =
> ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = queue://replyQueue,
> persistent = false, type = null, priority = 0, groupID = null, groupSequence
> = 0, targetConsumerId = null, compressed = false, userID = null, content =
> null, marshalledProperties = null, dataStructure = null, redeliveryCounter =
> 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
> false, droppable = false, text = abc}
> 2011-07-08 10:52:10,083 [main] DEBUG
> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent JMS
> message to: queue://dest with message: ActiveMQTextMessage {commandId = 0,
> responseRequired = false, messageId =
> ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination =
> null, originalTransactionId = null, producerId = null, destination =
> queue://dest, transactionId = null, expiration = 1310158350082, timestamp =
> 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
> correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
> queue://replyQueue, persistent = true, type = null, priority = 4, groupID =
> null, groupSequence = 0, targetConsumerId = null, compressed = false, userID
> = null, content = null, marshalledProperties = null, dataStructure = null,
> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
> false, readOnlyBody = false, droppable = false, text = abc}
> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
> JmsReplyManagerTimeoutChecker[dest] TRACE
> org.apache.camel.component.jms.reply.CorrelationMap - Running purge task to
> see if any entries has been timed out
> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
> JmsReplyManagerTimeoutChecker[dest] TRACE
> org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in the
> timeout map
> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
> - Consumer [ActiveMQMessageConsumer {
> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }] of
> session [ActiveMQSession
> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not
> receive a message
> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Using
> MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
> - Received message of type [class
> org.apache.activemq.command.ActiveMQTextMessage] from consumer
> [ActiveMQMessageConsumer {
> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }] of
> session [ActiveMQSession
> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Received
> reply message with correlationID: ID-rsi-eng-newsham-61454-1310158327407-0-4
> -> ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId =
> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination =
> null, originalTransactionId = null, producerId =
> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
> 1310158331077, correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4,
> replyTo = null, persistent = true, type = null, priority = 4, groupID =
> null, groupSequence = 0, targetConsumerId = null, compressed = false, userID
> = null, content = null, marshalledProperties = null, dataStructure = null,
> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
> true, readOnlyBody = true, droppable = false, text = reply}
> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
> org.apache.camel.component.jms.JmsBinding - Extracting body as a TextMessage
> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
> true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
> originalDestination = null, originalTransactionId = null, producerId =
> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
> 1310158331077, correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4,
> replyTo = null, persistent = true, type = null, priority = 4, groupID =
> null, groupSequence = 0, targetConsumerId = null, compressed = false, userID
> = null, content = null, marshalledProperties = null, dataStructure = null,
> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
> true, readOnlyBody = true, droppable = false, text = reply}
> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Reply
> received. Setting reply as OUT message: reply
> received reply in: 1.004 s
>
> Thanks,
> Jim
>
>
>
> On 7/7/2011 10:59 PM, Claus Ibsen wrote:
>>
>> Can you enable TRACE logging at org.apache.camel.component.jms and run
>> it for both examples.
>> To see what happens.
>>
>>
>>
>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jn...@referentia.com>
>>  wrote:
>>>
>>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
>>> specify a custom replyTo destination on the endpoint url, the time it
>>> takes
>>> for the producer to receive a reply increases drastically.  The curious
>>> thing is that the time to receive a reply is almost exactly 1 second.
>>>  When
>>> I remove the replyTo from the url, everything's fast again.
>>>
>>> I created a very simple, stand-alone test to demonstrate what I'm seeing.
>>>  There is a server class [4] which runs an embedded instance of ActiveMQ
>>> and
>>> simply replies to messages as they arrive; and a client [3] class which
>>> simply sends messages to the server, and prints the elapsed time.  The
>>> USE_REPLY_TO symbolic constant in the client determines whether a replyTo
>>> value is added to the url or not.
>>>
>>> The client output when USE_REPLY_TO is false is shown as [1].  The client
>>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>>> trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
>>> issue?
>>>
>>> Thanks!
>>> Jim
>>>
>>>
>>> [1] USE_REPLY_TO = false
>>>
>>> received reply in: 0.476 s
>>> received reply in: 0.006 s
>>> received reply in: 0.006 s
>>> received reply in: 0.006 s
>>> received reply in: 0.006 s
>>> ...
>>>
>>>
>>> [2] USE_REPLY_TO = true
>>>
>>> received reply in: 1.524 s
>>> received reply in: 1.002 s
>>> received reply in: 1.003 s
>>> received reply in: 1.003 s
>>> received reply in: 1.002 s
>>> ...
>>>
>>>
>>> [3] TestReplyToClient.java
>>>
>>> package test;
>>>
>>> import org.apache.activemq.ActiveMQConnectionFactory;
>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>> import org.apache.camel.CamelContext;
>>> import org.apache.camel.ProducerTemplate;
>>> import org.apache.camel.impl.DefaultCamelContext;
>>>
>>> public class TestReplyToClient {
>>>
>>>  private static final boolean USE_REPLY_TO = false;
>>>
>>>  public static void main(String... args) throws Exception {
>>>    // create camel context; configure activemq component for
>>> tcp://localhost:7001
>>>    CamelContext context = new DefaultCamelContext();
>>>    ActiveMQComponent activemqComponent =
>>> ActiveMQComponent.activeMQComponent();
>>>    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>>>      null, null, "tcp://localhost:7001"));
>>>    context.addComponent("activemq", activemqComponent);
>>>    context.start();
>>>
>>>    // define url to send requests to
>>>    String sendUrl = "activemq:queue:dest";
>>>    if (USE_REPLY_TO) {
>>>      sendUrl += "?replyTo=replyQueue";
>>>    }
>>>    System.err.println("sending to url: " + sendUrl);
>>>
>>>    // repeatedly send requests; measure elapsed time
>>>    ProducerTemplate template = context.createProducerTemplate();
>>>    while (true) {
>>>      long startNanos = System.nanoTime();
>>>      template.requestBody(sendUrl, "abc");
>>>      long elapsedNanos = System.nanoTime() - startNanos;
>>>      System.err.println(String.format("received reply in: %.3f s",
>>> elapsedNanos / 1000000000.0));
>>>    }
>>>  }
>>>
>>> }
>>>
>>>
>>> [4] TestReplyToServer.java
>>>
>>> package test;
>>>
>>> import org.apache.activemq.broker.BrokerService;
>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>> import org.apache.camel.CamelContext;
>>> import org.apache.camel.Exchange;
>>> import org.apache.camel.Processor;
>>> import org.apache.camel.builder.RouteBuilder;
>>> import org.apache.camel.impl.DefaultCamelContext;
>>>
>>> public class TestReplyToServer {
>>>
>>>  private static final String BROKER_NAME = "thebroker";
>>>
>>>  public static void main(String... args) throws Exception {
>>>    startBroker();
>>>    startCamel();
>>>    Thread.sleep(Long.MAX_VALUE);
>>>  }
>>>
>>>  private static void startBroker() throws Exception {
>>>    BrokerService brokerService = new BrokerService();
>>>    brokerService.setBrokerName(BROKER_NAME);
>>>    brokerService.setSchedulerSupport(false);
>>>    brokerService.setPersistent(false);
>>>    brokerService.addConnector("tcp://0.0.0.0:7001");
>>>    brokerService.start();
>>>    brokerService.waitUntilStarted();
>>>  }
>>>
>>>
>>>  private static void startCamel() throws Exception {
>>>    CamelContext context = new DefaultCamelContext();
>>>
>>>    ActiveMQComponent activemqComponent =
>>> ActiveMQComponent.activeMQComponent();
>>>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>>> BROKER_NAME));
>>>    context.addComponent("activemq", activemqComponent);
>>>
>>>    final String receiveUrl = "activemq:queue:dest";
>>>    context.addRoutes(new RouteBuilder() {
>>>      @Override
>>>      public void configure() throws Exception {
>>>        from(receiveUrl).process(new Processor() {
>>>          @Override
>>>          public void process(Exchange exchange) throws Exception {
>>>            System.err.println("received request");
>>>            exchange.getOut().setBody("reply");
>>>          }
>>>        });
>>>      }
>>>    });
>>>
>>>    context.start();
>>>    System.err.println("listening on url: " + receiveUrl);
>>>  }
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: slow reply for jms component when url contains replyTo

Posted by Jim Newsham <jn...@referentia.com>.
Hi Claus,

I enabled trace logging.  I'm attaching the logs (for both client and 
server; both with and without custom replyTo) as a zip file -- not sure 
if the mailing list will filter it, we'll see.

I see that there are 5 messages in the client log which only appear when 
a custom replyTo is specified:  "Running purge task to see if any 
entries has been timed out", "There are 1 in the timeout map", "did not 
receive a message", etc.  Here's an excerpt from each client log, for 
one exchange:

 From log for client without replyTo:

2011-07-08 10:55:32,354 [main] TRACE 
org.apache.camel.component.jms.JmsProducer - Using inOut jms template
2011-07-08 10:55:32,361 [main] DEBUG 
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - 
Executing callback on JMS Session: ActiveMQSession 
{id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
2011-07-08 10:55:32,361 [main] TRACE 
org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
2011-07-08 10:55:32,362 [main] DEBUG 
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - 
Sending JMS message to: queue://dest with message: ActiveMQTextMessage 
{commandId = 0, responseRequired = false, messageId = null, 
originalDestination = null, originalTransactionId = null, producerId = 
null, destination = null, transactionId = null, expiration = 0, 
timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, 
correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = 
temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent = 
false, type = null, priority = 0, groupID = null, groupSequence = 0, 
targetConsumerId = null, compressed = false, userID = null, content = 
null, marshalledProperties = null, dataStructure = null, 
redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = 
false, readOnlyBody = false, droppable = false, text = abc}
2011-07-08 10:55:32,363 [main] DEBUG 
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent 
JMS message to: queue://dest with message: ActiveMQTextMessage 
{commandId = 0, responseRequired = false, messageId = 
ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination = 
null, originalTransactionId = null, producerId = null, destination = 
queue://dest, transactionId = null, expiration = 1310158552362, 
timestamp = 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime 
= 0, correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo 
= temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent 
= true, type = null, priority = 4, groupID = null, groupSequence = 0, 
targetConsumerId = null, compressed = false, userID = null, content = 
null, marshalledProperties = null, dataStructure = null, 
redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = 
false, readOnlyBody = false, droppable = false, text = abc}
2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG 
org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - 
Received reply message with correlationID: 
ID-rsi-eng-newsham-61472-1310158530715-0-4 -> ActiveMQTextMessage 
{commandId = 9, responseRequired = true, messageId = 
ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination = 
null, originalTransactionId = null, producerId = 
ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination = 
temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId 
= null, expiration = 0, timestamp = 1310158532367, arrival = 0, 
brokerInTime = 1310158532367, brokerOutTime = 1310158532367, 
correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = 
null, persistent = true, type = null, priority = 4, groupID = null, 
groupSequence = 0, targetConsumerId = null, compressed = false, userID = 
null, content = null, marshalledProperties = null, dataStructure = null, 
redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = 
true, readOnlyBody = true, droppable = false, text = reply}
2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE 
org.apache.camel.component.jms.JmsBinding - Extracting body as a 
TextMessage from JMS message: ActiveMQTextMessage {commandId = 9, 
responseRequired = true, messageId = 
ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination = 
null, originalTransactionId = null, producerId = 
ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination = 
temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId 
= null, expiration = 0, timestamp = 1310158532367, arrival = 0, 
brokerInTime = 1310158532367, brokerOutTime = 1310158532367, 
correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = 
null, persistent = true, type = null, priority = 4, groupID = null, 
groupSequence = 0, targetConsumerId = null, compressed = false, userID = 
null, content = null, marshalledProperties = null, dataStructure = null, 
redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = 
true, readOnlyBody = true, droppable = false, text = reply}
2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG 
org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply 
received. Setting reply as OUT message: reply
received reply in: 0.015 s

 From log for client with replyTo:

2011-07-08 10:52:10,075 [main] TRACE 
org.apache.camel.component.jms.JmsProducer - Using inOut jms template
2011-07-08 10:52:10,081 [main] DEBUG 
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - 
Executing callback on JMS Session: ActiveMQSession 
{id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
2011-07-08 10:52:10,082 [main] TRACE 
org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
2011-07-08 10:52:10,082 [main] DEBUG 
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - 
Sending JMS message to: queue://dest with message: ActiveMQTextMessage 
{commandId = 0, responseRequired = false, messageId = null, 
originalDestination = null, originalTransactionId = null, producerId = 
null, destination = null, transactionId = null, expiration = 0, 
timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, 
correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = 
queue://replyQueue, persistent = false, type = null, priority = 0, 
groupID = null, groupSequence = 0, targetConsumerId = null, compressed = 
false, userID = null, content = null, marshalledProperties = null, 
dataStructure = null, redeliveryCounter = 0, size = 0, properties = 
null, readOnlyProperties = false, readOnlyBody = false, droppable = 
false, text = abc}
2011-07-08 10:52:10,083 [main] DEBUG 
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent 
JMS message to: queue://dest with message: ActiveMQTextMessage 
{commandId = 0, responseRequired = false, messageId = 
ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination = 
null, originalTransactionId = null, producerId = null, destination = 
queue://dest, transactionId = null, expiration = 1310158350082, 
timestamp = 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime 
= 0, correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo 
= queue://replyQueue, persistent = true, type = null, priority = 4, 
groupID = null, groupSequence = 0, targetConsumerId = null, compressed = 
false, userID = null, content = null, marshalledProperties = null, 
dataStructure = null, redeliveryCounter = 0, size = 0, properties = 
null, readOnlyProperties = false, readOnlyBody = false, droppable = 
false, text = abc}
2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 - 
JmsReplyManagerTimeoutChecker[dest] TRACE 
org.apache.camel.component.jms.reply.CorrelationMap - Running purge task 
to see if any entries has been timed out
2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 - 
JmsReplyManagerTimeoutChecker[dest] TRACE 
org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in the 
timeout map
2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG 
org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer 
- Consumer [ActiveMQMessageConsumer { 
value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }] of 
session [ActiveMQSession 
{id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not 
receive a message
2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE 
org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Using 
MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG 
org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer 
- Received message of type [class 
org.apache.activemq.command.ActiveMQTextMessage] from consumer 
[ActiveMQMessageConsumer { 
value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }] of 
session [ActiveMQSession 
{id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG 
org.apache.camel.component.jms.reply.PersistentQueueReplyManager - 
Received reply message with correlationID: 
ID-rsi-eng-newsham-61454-1310158327407-0-4 -> ActiveMQTextMessage 
{commandId = 9, responseRequired = true, messageId = 
ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination = 
null, originalTransactionId = null, producerId = 
ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination = 
queue://replyQueue, transactionId = null, expiration = 0, timestamp = 
1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime 
= 1310158331077, correlationId = 
ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = null, persistent = 
true, type = null, priority = 4, groupID = null, groupSequence = 0, 
targetConsumerId = null, compressed = false, userID = null, content = 
null, marshalledProperties = null, dataStructure = null, 
redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = 
true, readOnlyBody = true, droppable = false, text = reply}
2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE 
org.apache.camel.component.jms.JmsBinding - Extracting body as a 
TextMessage from JMS message: ActiveMQTextMessage {commandId = 9, 
responseRequired = true, messageId = 
ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination = 
null, originalTransactionId = null, producerId = 
ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination = 
queue://replyQueue, transactionId = null, expiration = 0, timestamp = 
1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime 
= 1310158331077, correlationId = 
ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = null, persistent = 
true, type = null, priority = 4, groupID = null, groupSequence = 0, 
targetConsumerId = null, compressed = false, userID = null, content = 
null, marshalledProperties = null, dataStructure = null, 
redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = 
true, readOnlyBody = true, droppable = false, text = reply}
2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG 
org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Reply 
received. Setting reply as OUT message: reply
received reply in: 1.004 s

Thanks,
Jim



On 7/7/2011 10:59 PM, Claus Ibsen wrote:
> Can you enable TRACE logging at org.apache.camel.component.jms and run
> it for both examples.
> To see what happens.
>
>
>
> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jn...@referentia.com>  wrote:
>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
>> specify a custom replyTo destination on the endpoint url, the time it takes
>> for the producer to receive a reply increases drastically.  The curious
>> thing is that the time to receive a reply is almost exactly 1 second.  When
>> I remove the replyTo from the url, everything's fast again.
>>
>> I created a very simple, stand-alone test to demonstrate what I'm seeing.
>>   There is a server class [4] which runs an embedded instance of ActiveMQ and
>> simply replies to messages as they arrive; and a client [3] class which
>> simply sends messages to the server, and prints the elapsed time.  The
>> USE_REPLY_TO symbolic constant in the client determines whether a replyTo
>> value is added to the url or not.
>>
>> The client output when USE_REPLY_TO is false is shown as [1].  The client
>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>> trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
>> issue?
>>
>> Thanks!
>> Jim
>>
>>
>> [1] USE_REPLY_TO = false
>>
>> received reply in: 0.476 s
>> received reply in: 0.006 s
>> received reply in: 0.006 s
>> received reply in: 0.006 s
>> received reply in: 0.006 s
>> ...
>>
>>
>> [2] USE_REPLY_TO = true
>>
>> received reply in: 1.524 s
>> received reply in: 1.002 s
>> received reply in: 1.003 s
>> received reply in: 1.003 s
>> received reply in: 1.002 s
>> ...
>>
>>
>> [3] TestReplyToClient.java
>>
>> package test;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.ProducerTemplate;
>> import org.apache.camel.impl.DefaultCamelContext;
>>
>> public class TestReplyToClient {
>>
>>   private static final boolean USE_REPLY_TO = false;
>>
>>   public static void main(String... args) throws Exception {
>>     // create camel context; configure activemq component for
>> tcp://localhost:7001
>>     CamelContext context = new DefaultCamelContext();
>>     ActiveMQComponent activemqComponent =
>> ActiveMQComponent.activeMQComponent();
>>     activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>>       null, null, "tcp://localhost:7001"));
>>     context.addComponent("activemq", activemqComponent);
>>     context.start();
>>
>>     // define url to send requests to
>>     String sendUrl = "activemq:queue:dest";
>>     if (USE_REPLY_TO) {
>>       sendUrl += "?replyTo=replyQueue";
>>     }
>>     System.err.println("sending to url: " + sendUrl);
>>
>>     // repeatedly send requests; measure elapsed time
>>     ProducerTemplate template = context.createProducerTemplate();
>>     while (true) {
>>       long startNanos = System.nanoTime();
>>       template.requestBody(sendUrl, "abc");
>>       long elapsedNanos = System.nanoTime() - startNanos;
>>       System.err.println(String.format("received reply in: %.3f s",
>> elapsedNanos / 1000000000.0));
>>     }
>>   }
>>
>> }
>>
>>
>> [4] TestReplyToServer.java
>>
>> package test;
>>
>> import org.apache.activemq.broker.BrokerService;
>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.impl.DefaultCamelContext;
>>
>> public class TestReplyToServer {
>>
>>   private static final String BROKER_NAME = "thebroker";
>>
>>   public static void main(String... args) throws Exception {
>>     startBroker();
>>     startCamel();
>>     Thread.sleep(Long.MAX_VALUE);
>>   }
>>
>>   private static void startBroker() throws Exception {
>>     BrokerService brokerService = new BrokerService();
>>     brokerService.setBrokerName(BROKER_NAME);
>>     brokerService.setSchedulerSupport(false);
>>     brokerService.setPersistent(false);
>>     brokerService.addConnector("tcp://0.0.0.0:7001");
>>     brokerService.start();
>>     brokerService.waitUntilStarted();
>>   }
>>
>>
>>   private static void startCamel() throws Exception {
>>     CamelContext context = new DefaultCamelContext();
>>
>>     ActiveMQComponent activemqComponent =
>> ActiveMQComponent.activeMQComponent();
>>     activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>> BROKER_NAME));
>>     context.addComponent("activemq", activemqComponent);
>>
>>     final String receiveUrl = "activemq:queue:dest";
>>     context.addRoutes(new RouteBuilder() {
>>       @Override
>>       public void configure() throws Exception {
>>         from(receiveUrl).process(new Processor() {
>>           @Override
>>           public void process(Exchange exchange) throws Exception {
>>             System.err.println("received request");
>>             exchange.getOut().setBody("reply");
>>           }
>>         });
>>       }
>>     });
>>
>>     context.start();
>>     System.err.println("listening on url: " + receiveUrl);
>>   }
>>
>> }
>>
>>
>>
>>
>>
>
>


Re: slow reply for jms component when url contains replyTo

Posted by Claus Ibsen <cl...@gmail.com>.
Can you enable TRACE logging at org.apache.camel.component.jms and run
it for both examples.
To see what happens.



On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jn...@referentia.com> wrote:
>
> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
> specify a custom replyTo destination on the endpoint url, the time it takes
> for the producer to receive a reply increases drastically.  The curious
> thing is that the time to receive a reply is almost exactly 1 second.  When
> I remove the replyTo from the url, everything's fast again.
>
> I created a very simple, stand-alone test to demonstrate what I'm seeing.
>  There is a server class [4] which runs an embedded instance of ActiveMQ and
> simply replies to messages as they arrive; and a client [3] class which
> simply sends messages to the server, and prints the elapsed time.  The
> USE_REPLY_TO symbolic constant in the client determines whether a replyTo
> value is added to the url or not.
>
> The client output when USE_REPLY_TO is false is shown as [1].  The client
> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
> trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
> issue?
>
> Thanks!
> Jim
>
>
> [1] USE_REPLY_TO = false
>
> received reply in: 0.476 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> ...
>
>
> [2] USE_REPLY_TO = true
>
> received reply in: 1.524 s
> received reply in: 1.002 s
> received reply in: 1.003 s
> received reply in: 1.003 s
> received reply in: 1.002 s
> ...
>
>
> [3] TestReplyToClient.java
>
> package test;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.CamelContext;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.impl.DefaultCamelContext;
>
> public class TestReplyToClient {
>
>  private static final boolean USE_REPLY_TO = false;
>
>  public static void main(String... args) throws Exception {
>    // create camel context; configure activemq component for
> tcp://localhost:7001
>    CamelContext context = new DefaultCamelContext();
>    ActiveMQComponent activemqComponent =
> ActiveMQComponent.activeMQComponent();
>    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>      null, null, "tcp://localhost:7001"));
>    context.addComponent("activemq", activemqComponent);
>    context.start();
>
>    // define url to send requests to
>    String sendUrl = "activemq:queue:dest";
>    if (USE_REPLY_TO) {
>      sendUrl += "?replyTo=replyQueue";
>    }
>    System.err.println("sending to url: " + sendUrl);
>
>    // repeatedly send requests; measure elapsed time
>    ProducerTemplate template = context.createProducerTemplate();
>    while (true) {
>      long startNanos = System.nanoTime();
>      template.requestBody(sendUrl, "abc");
>      long elapsedNanos = System.nanoTime() - startNanos;
>      System.err.println(String.format("received reply in: %.3f s",
> elapsedNanos / 1000000000.0));
>    }
>  }
>
> }
>
>
> [4] TestReplyToServer.java
>
> package test;
>
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
>
> public class TestReplyToServer {
>
>  private static final String BROKER_NAME = "thebroker";
>
>  public static void main(String... args) throws Exception {
>    startBroker();
>    startCamel();
>    Thread.sleep(Long.MAX_VALUE);
>  }
>
>  private static void startBroker() throws Exception {
>    BrokerService brokerService = new BrokerService();
>    brokerService.setBrokerName(BROKER_NAME);
>    brokerService.setSchedulerSupport(false);
>    brokerService.setPersistent(false);
>    brokerService.addConnector("tcp://0.0.0.0:7001");
>    brokerService.start();
>    brokerService.waitUntilStarted();
>  }
>
>
>  private static void startCamel() throws Exception {
>    CamelContext context = new DefaultCamelContext();
>
>    ActiveMQComponent activemqComponent =
> ActiveMQComponent.activeMQComponent();
>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
> BROKER_NAME));
>    context.addComponent("activemq", activemqComponent);
>
>    final String receiveUrl = "activemq:queue:dest";
>    context.addRoutes(new RouteBuilder() {
>      @Override
>      public void configure() throws Exception {
>        from(receiveUrl).process(new Processor() {
>          @Override
>          public void process(Exchange exchange) throws Exception {
>            System.err.println("received request");
>            exchange.getOut().setBody("reply");
>          }
>        });
>      }
>    });
>
>    context.start();
>    System.err.println("listening on url: " + receiveUrl);
>  }
>
> }
>
>
>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: slow reply for jms component when url contains replyTo

Posted by David Karlsen <da...@gmail.com>.
Because each receiver listens for a distinct correclationid this is not a
problem.

2011/8/14 Claus Ibsen <cl...@gmail.com>

> On Sat, Aug 13, 2011 at 8:06 PM, David Karlsen <da...@gmail.com>
> wrote:
> > Hi - I would be very interested in this (and willing to test).
> > We had to back out from using camel for our req/res jms (we now use
> > jmstemplate directly) - especially because the persistentrelyqueuemanager
> > (or something like that) seemed to use the same connection - so
> eventually
> > the fw closed that connection during hours with little traffic.
> >
> > Another positive effect of switching to using jmstemplate directly is
> that
> > the same thread is handling the response and the request - and we have a
> > thread scoped state object attached to it.
> >
> > We are running in a cluster - but because we use correlationid this is
> not a
> > problem as you say.
> >
>
> btw since you run in a clustered environment. Do you use the same
> persistent reply queues for all nodes in the cluster?
> If so how is your strategy to ensure the reply messages get consumed
> by the right node?
>
>
> > The underlying MOM is WebSphere MQ.
> >
> > BTW: "vinning" in my rely was supposted to spell "common" - I blame my
> phone
> > for that ;-)
> >
> > 2011/8/13 Claus Ibsen <cl...@gmail.com>
> >
> >> Hi Jim, others
> >>
> >> I got some code for supporting exclusive replyTo queues. I just want
> >> to know if you run in a clustered environment.
> >> As the replyTo queue would be exclusive to each CamelContext (eg each
> >> node). So if you have 2 nodes running with Camel then each node would
> >> have to use an unique replyTo queue.
> >>
> >> If you run in a single instance of CamelContext then there is of
> >> course no problem.
> >>
> >> Alternative you can always use shared queues in a cluster, as each
> >> CamelContext (each node) will only pickup intended reply message for
> >> it (using a JMS selector), which is also the case why its slower. You
> >> can tweak the performance with the receiveTimeout. That is default 1
> >> sec. If you set it lower, then it will potential react faster to new
> >> replies coming back.
> >>
> >>
> >>
> >> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jn...@referentia.com>
> >> wrote:
> >> >
> >> > I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when
> I
> >> > specify a custom replyTo destination on the endpoint url, the time it
> >> takes
> >> > for the producer to receive a reply increases drastically.  The
> curious
> >> > thing is that the time to receive a reply is almost exactly 1 second.
> >>  When
> >> > I remove the replyTo from the url, everything's fast again.
> >> >
> >> > I created a very simple, stand-alone test to demonstrate what I'm
> seeing.
> >> >  There is a server class [4] which runs an embedded instance of
> ActiveMQ
> >> and
> >> > simply replies to messages as they arrive; and a client [3] class
> which
> >> > simply sends messages to the server, and prints the elapsed time.  The
> >> > USE_REPLY_TO symbolic constant in the client determines whether a
> replyTo
> >> > value is added to the url or not.
> >> >
> >> > The client output when USE_REPLY_TO is false is shown as [1].  The
> client
> >> > output when USE_REPLY_TO is true is shown as [2].  The code is pretty
> >> > trivial.  Am I doing something wrong, or is this a Camel and/or
> ActiveMQ
> >> > issue?
> >> >
> >> > Thanks!
> >> > Jim
> >> >
> >> >
> >> > [1] USE_REPLY_TO = false
> >> >
> >> > received reply in: 0.476 s
> >> > received reply in: 0.006 s
> >> > received reply in: 0.006 s
> >> > received reply in: 0.006 s
> >> > received reply in: 0.006 s
> >> > ...
> >> >
> >> >
> >> > [2] USE_REPLY_TO = true
> >> >
> >> > received reply in: 1.524 s
> >> > received reply in: 1.002 s
> >> > received reply in: 1.003 s
> >> > received reply in: 1.003 s
> >> > received reply in: 1.002 s
> >> > ...
> >> >
> >> >
> >> > [3] TestReplyToClient.java
> >> >
> >> > package test;
> >> >
> >> > import org.apache.activemq.ActiveMQConnectionFactory;
> >> > import org.apache.activemq.camel.component.ActiveMQComponent;
> >> > import org.apache.camel.CamelContext;
> >> > import org.apache.camel.ProducerTemplate;
> >> > import org.apache.camel.impl.DefaultCamelContext;
> >> >
> >> > public class TestReplyToClient {
> >> >
> >> >  private static final boolean USE_REPLY_TO = false;
> >> >
> >> >  public static void main(String... args) throws Exception {
> >> >    // create camel context; configure activemq component for
> >> > tcp://localhost:7001
> >> >    CamelContext context = new DefaultCamelContext();
> >> >    ActiveMQComponent activemqComponent =
> >> > ActiveMQComponent.activeMQComponent();
> >> >    activemqComponent.setConnectionFactory(new
> ActiveMQConnectionFactory(
> >> >      null, null, "tcp://localhost:7001"));
> >> >    context.addComponent("activemq", activemqComponent);
> >> >    context.start();
> >> >
> >> >    // define url to send requests to
> >> >    String sendUrl = "activemq:queue:dest";
> >> >    if (USE_REPLY_TO) {
> >> >      sendUrl += "?replyTo=replyQueue";
> >> >    }
> >> >    System.err.println("sending to url: " + sendUrl);
> >> >
> >> >    // repeatedly send requests; measure elapsed time
> >> >    ProducerTemplate template = context.createProducerTemplate();
> >> >    while (true) {
> >> >      long startNanos = System.nanoTime();
> >> >      template.requestBody(sendUrl, "abc");
> >> >      long elapsedNanos = System.nanoTime() - startNanos;
> >> >      System.err.println(String.format("received reply in: %.3f s",
> >> > elapsedNanos / 1000000000.0));
> >> >    }
> >> >  }
> >> >
> >> > }
> >> >
> >> >
> >> > [4] TestReplyToServer.java
> >> >
> >> > package test;
> >> >
> >> > import org.apache.activemq.broker.BrokerService;
> >> > import org.apache.activemq.camel.component.ActiveMQComponent;
> >> > import org.apache.camel.CamelContext;
> >> > import org.apache.camel.Exchange;
> >> > import org.apache.camel.Processor;
> >> > import org.apache.camel.builder.RouteBuilder;
> >> > import org.apache.camel.impl.DefaultCamelContext;
> >> >
> >> > public class TestReplyToServer {
> >> >
> >> >  private static final String BROKER_NAME = "thebroker";
> >> >
> >> >  public static void main(String... args) throws Exception {
> >> >    startBroker();
> >> >    startCamel();
> >> >    Thread.sleep(Long.MAX_VALUE);
> >> >  }
> >> >
> >> >  private static void startBroker() throws Exception {
> >> >    BrokerService brokerService = new BrokerService();
> >> >    brokerService.setBrokerName(BROKER_NAME);
> >> >    brokerService.setSchedulerSupport(false);
> >> >    brokerService.setPersistent(false);
> >> >    brokerService.addConnector("tcp://0.0.0.0:7001");
> >> >    brokerService.start();
> >> >    brokerService.waitUntilStarted();
> >> >  }
> >> >
> >> >
> >> >  private static void startCamel() throws Exception {
> >> >    CamelContext context = new DefaultCamelContext();
> >> >
> >> >    ActiveMQComponent activemqComponent =
> >> > ActiveMQComponent.activeMQComponent();
> >> >
>  activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
> >> > BROKER_NAME));
> >> >    context.addComponent("activemq", activemqComponent);
> >> >
> >> >    final String receiveUrl = "activemq:queue:dest";
> >> >    context.addRoutes(new RouteBuilder() {
> >> >      @Override
> >> >      public void configure() throws Exception {
> >> >        from(receiveUrl).process(new Processor() {
> >> >          @Override
> >> >          public void process(Exchange exchange) throws Exception {
> >> >            System.err.println("received request");
> >> >            exchange.getOut().setBody("reply");
> >> >          }
> >> >        });
> >> >      }
> >> >    });
> >> >
> >> >    context.start();
> >> >    System.err.println("listening on url: " + receiveUrl);
> >> >  }
> >> >
> >> > }
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> FuseSource
> >> Email: cibsen@fusesource.com
> >> Web: http://fusesource.com
> >> Twitter: davsclaus, fusenews
> >> Blog: http://davsclaus.blogspot.com/
> >> Author of Camel in Action: http://www.manning.com/ibsen/
> >>
> >
> >
> >
> > --
> > --
> > David J. M. Karlsen - http://www.linkedin.com/in/davidkarlsen
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>



-- 
--
David J. M. Karlsen - http://www.linkedin.com/in/davidkarlsen

Re: slow reply for jms component when url contains replyTo

Posted by Claus Ibsen <cl...@gmail.com>.
On Sat, Aug 13, 2011 at 8:06 PM, David Karlsen <da...@gmail.com> wrote:
> Hi - I would be very interested in this (and willing to test).
> We had to back out from using camel for our req/res jms (we now use
> jmstemplate directly) - especially because the persistentrelyqueuemanager
> (or something like that) seemed to use the same connection - so eventually
> the fw closed that connection during hours with little traffic.
>
> Another positive effect of switching to using jmstemplate directly is that
> the same thread is handling the response and the request - and we have a
> thread scoped state object attached to it.
>
> We are running in a cluster - but because we use correlationid this is not a
> problem as you say.
>

btw since you run in a clustered environment. Do you use the same
persistent reply queues for all nodes in the cluster?
If so how is your strategy to ensure the reply messages get consumed
by the right node?


> The underlying MOM is WebSphere MQ.
>
> BTW: "vinning" in my rely was supposted to spell "common" - I blame my phone
> for that ;-)
>
> 2011/8/13 Claus Ibsen <cl...@gmail.com>
>
>> Hi Jim, others
>>
>> I got some code for supporting exclusive replyTo queues. I just want
>> to know if you run in a clustered environment.
>> As the replyTo queue would be exclusive to each CamelContext (eg each
>> node). So if you have 2 nodes running with Camel then each node would
>> have to use an unique replyTo queue.
>>
>> If you run in a single instance of CamelContext then there is of
>> course no problem.
>>
>> Alternative you can always use shared queues in a cluster, as each
>> CamelContext (each node) will only pickup intended reply message for
>> it (using a JMS selector), which is also the case why its slower. You
>> can tweak the performance with the receiveTimeout. That is default 1
>> sec. If you set it lower, then it will potential react faster to new
>> replies coming back.
>>
>>
>>
>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jn...@referentia.com>
>> wrote:
>> >
>> > I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
>> > specify a custom replyTo destination on the endpoint url, the time it
>> takes
>> > for the producer to receive a reply increases drastically.  The curious
>> > thing is that the time to receive a reply is almost exactly 1 second.
>>  When
>> > I remove the replyTo from the url, everything's fast again.
>> >
>> > I created a very simple, stand-alone test to demonstrate what I'm seeing.
>> >  There is a server class [4] which runs an embedded instance of ActiveMQ
>> and
>> > simply replies to messages as they arrive; and a client [3] class which
>> > simply sends messages to the server, and prints the elapsed time.  The
>> > USE_REPLY_TO symbolic constant in the client determines whether a replyTo
>> > value is added to the url or not.
>> >
>> > The client output when USE_REPLY_TO is false is shown as [1].  The client
>> > output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>> > trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
>> > issue?
>> >
>> > Thanks!
>> > Jim
>> >
>> >
>> > [1] USE_REPLY_TO = false
>> >
>> > received reply in: 0.476 s
>> > received reply in: 0.006 s
>> > received reply in: 0.006 s
>> > received reply in: 0.006 s
>> > received reply in: 0.006 s
>> > ...
>> >
>> >
>> > [2] USE_REPLY_TO = true
>> >
>> > received reply in: 1.524 s
>> > received reply in: 1.002 s
>> > received reply in: 1.003 s
>> > received reply in: 1.003 s
>> > received reply in: 1.002 s
>> > ...
>> >
>> >
>> > [3] TestReplyToClient.java
>> >
>> > package test;
>> >
>> > import org.apache.activemq.ActiveMQConnectionFactory;
>> > import org.apache.activemq.camel.component.ActiveMQComponent;
>> > import org.apache.camel.CamelContext;
>> > import org.apache.camel.ProducerTemplate;
>> > import org.apache.camel.impl.DefaultCamelContext;
>> >
>> > public class TestReplyToClient {
>> >
>> >  private static final boolean USE_REPLY_TO = false;
>> >
>> >  public static void main(String... args) throws Exception {
>> >    // create camel context; configure activemq component for
>> > tcp://localhost:7001
>> >    CamelContext context = new DefaultCamelContext();
>> >    ActiveMQComponent activemqComponent =
>> > ActiveMQComponent.activeMQComponent();
>> >    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>> >      null, null, "tcp://localhost:7001"));
>> >    context.addComponent("activemq", activemqComponent);
>> >    context.start();
>> >
>> >    // define url to send requests to
>> >    String sendUrl = "activemq:queue:dest";
>> >    if (USE_REPLY_TO) {
>> >      sendUrl += "?replyTo=replyQueue";
>> >    }
>> >    System.err.println("sending to url: " + sendUrl);
>> >
>> >    // repeatedly send requests; measure elapsed time
>> >    ProducerTemplate template = context.createProducerTemplate();
>> >    while (true) {
>> >      long startNanos = System.nanoTime();
>> >      template.requestBody(sendUrl, "abc");
>> >      long elapsedNanos = System.nanoTime() - startNanos;
>> >      System.err.println(String.format("received reply in: %.3f s",
>> > elapsedNanos / 1000000000.0));
>> >    }
>> >  }
>> >
>> > }
>> >
>> >
>> > [4] TestReplyToServer.java
>> >
>> > package test;
>> >
>> > import org.apache.activemq.broker.BrokerService;
>> > import org.apache.activemq.camel.component.ActiveMQComponent;
>> > import org.apache.camel.CamelContext;
>> > import org.apache.camel.Exchange;
>> > import org.apache.camel.Processor;
>> > import org.apache.camel.builder.RouteBuilder;
>> > import org.apache.camel.impl.DefaultCamelContext;
>> >
>> > public class TestReplyToServer {
>> >
>> >  private static final String BROKER_NAME = "thebroker";
>> >
>> >  public static void main(String... args) throws Exception {
>> >    startBroker();
>> >    startCamel();
>> >    Thread.sleep(Long.MAX_VALUE);
>> >  }
>> >
>> >  private static void startBroker() throws Exception {
>> >    BrokerService brokerService = new BrokerService();
>> >    brokerService.setBrokerName(BROKER_NAME);
>> >    brokerService.setSchedulerSupport(false);
>> >    brokerService.setPersistent(false);
>> >    brokerService.addConnector("tcp://0.0.0.0:7001");
>> >    brokerService.start();
>> >    brokerService.waitUntilStarted();
>> >  }
>> >
>> >
>> >  private static void startCamel() throws Exception {
>> >    CamelContext context = new DefaultCamelContext();
>> >
>> >    ActiveMQComponent activemqComponent =
>> > ActiveMQComponent.activeMQComponent();
>> >    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>> > BROKER_NAME));
>> >    context.addComponent("activemq", activemqComponent);
>> >
>> >    final String receiveUrl = "activemq:queue:dest";
>> >    context.addRoutes(new RouteBuilder() {
>> >      @Override
>> >      public void configure() throws Exception {
>> >        from(receiveUrl).process(new Processor() {
>> >          @Override
>> >          public void process(Exchange exchange) throws Exception {
>> >            System.err.println("received request");
>> >            exchange.getOut().setBody("reply");
>> >          }
>> >        });
>> >      }
>> >    });
>> >
>> >    context.start();
>> >    System.err.println("listening on url: " + receiveUrl);
>> >  }
>> >
>> > }
>> >
>> >
>> >
>> >
>> >
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>>
>
>
>
> --
> --
> David J. M. Karlsen - http://www.linkedin.com/in/davidkarlsen
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: slow reply for jms component when url contains replyTo

Posted by David Karlsen <da...@gmail.com>.
Hi.

Yes - agree that this causes more blocking.
I do not share any information between the threads as the returned messageId
from the request is used for the corresponding receive with the generated
messageId as correlationId.
Each receive will use a fresh connection from the pool which is hasn't been
cut by the firewall.

2011/8/14 Claus Ibsen <cl...@gmail.com>

> On Sat, Aug 13, 2011 at 8:06 PM, David Karlsen <da...@gmail.com>
> wrote:
> > Hi - I would be very interested in this (and willing to test).
> > We had to back out from using camel for our req/res jms (we now use
> > jmstemplate directly) - especially because the persistentrelyqueuemanager
> > (or something like that) seemed to use the same connection - so
> eventually
> > the fw closed that connection during hours with little traffic.
>
> The reply manager uses the default message listener which supports
> automatic reconnection.
> That was not the case in older Camel releases, so you may have hit
> that issue back then.
>
>
> >
> > Another positive effect of switching to using jmstemplate directly is
> that
> > the same thread is handling the response and the request - and we have a
> > thread scoped state object attached to it.
> >
>
> You can store information on the Exchange then its kept safe for async
> messaging / thread context switches etc.
>
>
> > We are running in a cluster - but because we use correlationid this is
> not a
> > problem as you say.
> >
>
> So if you use a jmstemplate to do request/reply then you are not as
> efficient as you will have threads blocking.
> And as well having multiple threads racing for replies. So if you have
> like 5 concurrent request/replies going on, then
> you have 5 threads using the jmstemplate, and all 5 is also active
> listening for the replies on the queue.
> And you then use a shared correlation map to ensure each of those 5
> gets the intended reply.
>
> Is that correct?
>
>
>
>
>
>
> > The underlying MOM is WebSphere MQ.
> >
> > BTW: "vinning" in my rely was supposted to spell "common" - I blame my
> phone
> > for that ;-)
> >
> > 2011/8/13 Claus Ibsen <cl...@gmail.com>
> >
> >> Hi Jim, others
> >>
> >> I got some code for supporting exclusive replyTo queues. I just want
> >> to know if you run in a clustered environment.
> >> As the replyTo queue would be exclusive to each CamelContext (eg each
> >> node). So if you have 2 nodes running with Camel then each node would
> >> have to use an unique replyTo queue.
> >>
> >> If you run in a single instance of CamelContext then there is of
> >> course no problem.
> >>
> >> Alternative you can always use shared queues in a cluster, as each
> >> CamelContext (each node) will only pickup intended reply message for
> >> it (using a JMS selector), which is also the case why its slower. You
> >> can tweak the performance with the receiveTimeout. That is default 1
> >> sec. If you set it lower, then it will potential react faster to new
> >> replies coming back.
> >>
> >>
> >>
> >> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jn...@referentia.com>
> >> wrote:
> >> >
> >> > I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when
> I
> >> > specify a custom replyTo destination on the endpoint url, the time it
> >> takes
> >> > for the producer to receive a reply increases drastically.  The
> curious
> >> > thing is that the time to receive a reply is almost exactly 1 second.
> >>  When
> >> > I remove the replyTo from the url, everything's fast again.
> >> >
> >> > I created a very simple, stand-alone test to demonstrate what I'm
> seeing.
> >> >  There is a server class [4] which runs an embedded instance of
> ActiveMQ
> >> and
> >> > simply replies to messages as they arrive; and a client [3] class
> which
> >> > simply sends messages to the server, and prints the elapsed time.  The
> >> > USE_REPLY_TO symbolic constant in the client determines whether a
> replyTo
> >> > value is added to the url or not.
> >> >
> >> > The client output when USE_REPLY_TO is false is shown as [1].  The
> client
> >> > output when USE_REPLY_TO is true is shown as [2].  The code is pretty
> >> > trivial.  Am I doing something wrong, or is this a Camel and/or
> ActiveMQ
> >> > issue?
> >> >
> >> > Thanks!
> >> > Jim
> >> >
> >> >
> >> > [1] USE_REPLY_TO = false
> >> >
> >> > received reply in: 0.476 s
> >> > received reply in: 0.006 s
> >> > received reply in: 0.006 s
> >> > received reply in: 0.006 s
> >> > received reply in: 0.006 s
> >> > ...
> >> >
> >> >
> >> > [2] USE_REPLY_TO = true
> >> >
> >> > received reply in: 1.524 s
> >> > received reply in: 1.002 s
> >> > received reply in: 1.003 s
> >> > received reply in: 1.003 s
> >> > received reply in: 1.002 s
> >> > ...
> >> >
> >> >
> >> > [3] TestReplyToClient.java
> >> >
> >> > package test;
> >> >
> >> > import org.apache.activemq.ActiveMQConnectionFactory;
> >> > import org.apache.activemq.camel.component.ActiveMQComponent;
> >> > import org.apache.camel.CamelContext;
> >> > import org.apache.camel.ProducerTemplate;
> >> > import org.apache.camel.impl.DefaultCamelContext;
> >> >
> >> > public class TestReplyToClient {
> >> >
> >> >  private static final boolean USE_REPLY_TO = false;
> >> >
> >> >  public static void main(String... args) throws Exception {
> >> >    // create camel context; configure activemq component for
> >> > tcp://localhost:7001
> >> >    CamelContext context = new DefaultCamelContext();
> >> >    ActiveMQComponent activemqComponent =
> >> > ActiveMQComponent.activeMQComponent();
> >> >    activemqComponent.setConnectionFactory(new
> ActiveMQConnectionFactory(
> >> >      null, null, "tcp://localhost:7001"));
> >> >    context.addComponent("activemq", activemqComponent);
> >> >    context.start();
> >> >
> >> >    // define url to send requests to
> >> >    String sendUrl = "activemq:queue:dest";
> >> >    if (USE_REPLY_TO) {
> >> >      sendUrl += "?replyTo=replyQueue";
> >> >    }
> >> >    System.err.println("sending to url: " + sendUrl);
> >> >
> >> >    // repeatedly send requests; measure elapsed time
> >> >    ProducerTemplate template = context.createProducerTemplate();
> >> >    while (true) {
> >> >      long startNanos = System.nanoTime();
> >> >      template.requestBody(sendUrl, "abc");
> >> >      long elapsedNanos = System.nanoTime() - startNanos;
> >> >      System.err.println(String.format("received reply in: %.3f s",
> >> > elapsedNanos / 1000000000.0));
> >> >    }
> >> >  }
> >> >
> >> > }
> >> >
> >> >
> >> > [4] TestReplyToServer.java
> >> >
> >> > package test;
> >> >
> >> > import org.apache.activemq.broker.BrokerService;
> >> > import org.apache.activemq.camel.component.ActiveMQComponent;
> >> > import org.apache.camel.CamelContext;
> >> > import org.apache.camel.Exchange;
> >> > import org.apache.camel.Processor;
> >> > import org.apache.camel.builder.RouteBuilder;
> >> > import org.apache.camel.impl.DefaultCamelContext;
> >> >
> >> > public class TestReplyToServer {
> >> >
> >> >  private static final String BROKER_NAME = "thebroker";
> >> >
> >> >  public static void main(String... args) throws Exception {
> >> >    startBroker();
> >> >    startCamel();
> >> >    Thread.sleep(Long.MAX_VALUE);
> >> >  }
> >> >
> >> >  private static void startBroker() throws Exception {
> >> >    BrokerService brokerService = new BrokerService();
> >> >    brokerService.setBrokerName(BROKER_NAME);
> >> >    brokerService.setSchedulerSupport(false);
> >> >    brokerService.setPersistent(false);
> >> >    brokerService.addConnector("tcp://0.0.0.0:7001");
> >> >    brokerService.start();
> >> >    brokerService.waitUntilStarted();
> >> >  }
> >> >
> >> >
> >> >  private static void startCamel() throws Exception {
> >> >    CamelContext context = new DefaultCamelContext();
> >> >
> >> >    ActiveMQComponent activemqComponent =
> >> > ActiveMQComponent.activeMQComponent();
> >> >
>  activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
> >> > BROKER_NAME));
> >> >    context.addComponent("activemq", activemqComponent);
> >> >
> >> >    final String receiveUrl = "activemq:queue:dest";
> >> >    context.addRoutes(new RouteBuilder() {
> >> >      @Override
> >> >      public void configure() throws Exception {
> >> >        from(receiveUrl).process(new Processor() {
> >> >          @Override
> >> >          public void process(Exchange exchange) throws Exception {
> >> >            System.err.println("received request");
> >> >            exchange.getOut().setBody("reply");
> >> >          }
> >> >        });
> >> >      }
> >> >    });
> >> >
> >> >    context.start();
> >> >    System.err.println("listening on url: " + receiveUrl);
> >> >  }
> >> >
> >> > }
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> FuseSource
> >> Email: cibsen@fusesource.com
> >> Web: http://fusesource.com
> >> Twitter: davsclaus, fusenews
> >> Blog: http://davsclaus.blogspot.com/
> >> Author of Camel in Action: http://www.manning.com/ibsen/
> >>
> >
> >
> >
> > --
> > --
> > David J. M. Karlsen - http://www.linkedin.com/in/davidkarlsen
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>



-- 
--
David J. M. Karlsen - http://www.linkedin.com/in/davidkarlsen

Re: slow reply for jms component when url contains replyTo

Posted by Claus Ibsen <cl...@gmail.com>.
On Sat, Aug 13, 2011 at 8:06 PM, David Karlsen <da...@gmail.com> wrote:
> Hi - I would be very interested in this (and willing to test).
> We had to back out from using camel for our req/res jms (we now use
> jmstemplate directly) - especially because the persistentrelyqueuemanager
> (or something like that) seemed to use the same connection - so eventually
> the fw closed that connection during hours with little traffic.

The reply manager uses the default message listener which supports
automatic reconnection.
That was not the case in older Camel releases, so you may have hit
that issue back then.


>
> Another positive effect of switching to using jmstemplate directly is that
> the same thread is handling the response and the request - and we have a
> thread scoped state object attached to it.
>

You can store information on the Exchange then its kept safe for async
messaging / thread context switches etc.


> We are running in a cluster - but because we use correlationid this is not a
> problem as you say.
>

So if you use a jmstemplate to do request/reply then you are not as
efficient as you will have threads blocking.
And as well having multiple threads racing for replies. So if you have
like 5 concurrent request/replies going on, then
you have 5 threads using the jmstemplate, and all 5 is also active
listening for the replies on the queue.
And you then use a shared correlation map to ensure each of those 5
gets the intended reply.

Is that correct?






> The underlying MOM is WebSphere MQ.
>
> BTW: "vinning" in my rely was supposted to spell "common" - I blame my phone
> for that ;-)
>
> 2011/8/13 Claus Ibsen <cl...@gmail.com>
>
>> Hi Jim, others
>>
>> I got some code for supporting exclusive replyTo queues. I just want
>> to know if you run in a clustered environment.
>> As the replyTo queue would be exclusive to each CamelContext (eg each
>> node). So if you have 2 nodes running with Camel then each node would
>> have to use an unique replyTo queue.
>>
>> If you run in a single instance of CamelContext then there is of
>> course no problem.
>>
>> Alternative you can always use shared queues in a cluster, as each
>> CamelContext (each node) will only pickup intended reply message for
>> it (using a JMS selector), which is also the case why its slower. You
>> can tweak the performance with the receiveTimeout. That is default 1
>> sec. If you set it lower, then it will potential react faster to new
>> replies coming back.
>>
>>
>>
>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jn...@referentia.com>
>> wrote:
>> >
>> > I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
>> > specify a custom replyTo destination on the endpoint url, the time it
>> takes
>> > for the producer to receive a reply increases drastically.  The curious
>> > thing is that the time to receive a reply is almost exactly 1 second.
>>  When
>> > I remove the replyTo from the url, everything's fast again.
>> >
>> > I created a very simple, stand-alone test to demonstrate what I'm seeing.
>> >  There is a server class [4] which runs an embedded instance of ActiveMQ
>> and
>> > simply replies to messages as they arrive; and a client [3] class which
>> > simply sends messages to the server, and prints the elapsed time.  The
>> > USE_REPLY_TO symbolic constant in the client determines whether a replyTo
>> > value is added to the url or not.
>> >
>> > The client output when USE_REPLY_TO is false is shown as [1].  The client
>> > output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>> > trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
>> > issue?
>> >
>> > Thanks!
>> > Jim
>> >
>> >
>> > [1] USE_REPLY_TO = false
>> >
>> > received reply in: 0.476 s
>> > received reply in: 0.006 s
>> > received reply in: 0.006 s
>> > received reply in: 0.006 s
>> > received reply in: 0.006 s
>> > ...
>> >
>> >
>> > [2] USE_REPLY_TO = true
>> >
>> > received reply in: 1.524 s
>> > received reply in: 1.002 s
>> > received reply in: 1.003 s
>> > received reply in: 1.003 s
>> > received reply in: 1.002 s
>> > ...
>> >
>> >
>> > [3] TestReplyToClient.java
>> >
>> > package test;
>> >
>> > import org.apache.activemq.ActiveMQConnectionFactory;
>> > import org.apache.activemq.camel.component.ActiveMQComponent;
>> > import org.apache.camel.CamelContext;
>> > import org.apache.camel.ProducerTemplate;
>> > import org.apache.camel.impl.DefaultCamelContext;
>> >
>> > public class TestReplyToClient {
>> >
>> >  private static final boolean USE_REPLY_TO = false;
>> >
>> >  public static void main(String... args) throws Exception {
>> >    // create camel context; configure activemq component for
>> > tcp://localhost:7001
>> >    CamelContext context = new DefaultCamelContext();
>> >    ActiveMQComponent activemqComponent =
>> > ActiveMQComponent.activeMQComponent();
>> >    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>> >      null, null, "tcp://localhost:7001"));
>> >    context.addComponent("activemq", activemqComponent);
>> >    context.start();
>> >
>> >    // define url to send requests to
>> >    String sendUrl = "activemq:queue:dest";
>> >    if (USE_REPLY_TO) {
>> >      sendUrl += "?replyTo=replyQueue";
>> >    }
>> >    System.err.println("sending to url: " + sendUrl);
>> >
>> >    // repeatedly send requests; measure elapsed time
>> >    ProducerTemplate template = context.createProducerTemplate();
>> >    while (true) {
>> >      long startNanos = System.nanoTime();
>> >      template.requestBody(sendUrl, "abc");
>> >      long elapsedNanos = System.nanoTime() - startNanos;
>> >      System.err.println(String.format("received reply in: %.3f s",
>> > elapsedNanos / 1000000000.0));
>> >    }
>> >  }
>> >
>> > }
>> >
>> >
>> > [4] TestReplyToServer.java
>> >
>> > package test;
>> >
>> > import org.apache.activemq.broker.BrokerService;
>> > import org.apache.activemq.camel.component.ActiveMQComponent;
>> > import org.apache.camel.CamelContext;
>> > import org.apache.camel.Exchange;
>> > import org.apache.camel.Processor;
>> > import org.apache.camel.builder.RouteBuilder;
>> > import org.apache.camel.impl.DefaultCamelContext;
>> >
>> > public class TestReplyToServer {
>> >
>> >  private static final String BROKER_NAME = "thebroker";
>> >
>> >  public static void main(String... args) throws Exception {
>> >    startBroker();
>> >    startCamel();
>> >    Thread.sleep(Long.MAX_VALUE);
>> >  }
>> >
>> >  private static void startBroker() throws Exception {
>> >    BrokerService brokerService = new BrokerService();
>> >    brokerService.setBrokerName(BROKER_NAME);
>> >    brokerService.setSchedulerSupport(false);
>> >    brokerService.setPersistent(false);
>> >    brokerService.addConnector("tcp://0.0.0.0:7001");
>> >    brokerService.start();
>> >    brokerService.waitUntilStarted();
>> >  }
>> >
>> >
>> >  private static void startCamel() throws Exception {
>> >    CamelContext context = new DefaultCamelContext();
>> >
>> >    ActiveMQComponent activemqComponent =
>> > ActiveMQComponent.activeMQComponent();
>> >    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>> > BROKER_NAME));
>> >    context.addComponent("activemq", activemqComponent);
>> >
>> >    final String receiveUrl = "activemq:queue:dest";
>> >    context.addRoutes(new RouteBuilder() {
>> >      @Override
>> >      public void configure() throws Exception {
>> >        from(receiveUrl).process(new Processor() {
>> >          @Override
>> >          public void process(Exchange exchange) throws Exception {
>> >            System.err.println("received request");
>> >            exchange.getOut().setBody("reply");
>> >          }
>> >        });
>> >      }
>> >    });
>> >
>> >    context.start();
>> >    System.err.println("listening on url: " + receiveUrl);
>> >  }
>> >
>> > }
>> >
>> >
>> >
>> >
>> >
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>>
>
>
>
> --
> --
> David J. M. Karlsen - http://www.linkedin.com/in/davidkarlsen
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: slow reply for jms component when url contains replyTo

Posted by David Karlsen <da...@gmail.com>.
Hi - I would be very interested in this (and willing to test).
We had to back out from using camel for our req/res jms (we now use
jmstemplate directly) - especially because the persistentrelyqueuemanager
(or something like that) seemed to use the same connection - so eventually
the fw closed that connection during hours with little traffic.

Another positive effect of switching to using jmstemplate directly is that
the same thread is handling the response and the request - and we have a
thread scoped state object attached to it.

We are running in a cluster - but because we use correlationid this is not a
problem as you say.

The underlying MOM is WebSphere MQ.

BTW: "vinning" in my rely was supposted to spell "common" - I blame my phone
for that ;-)

2011/8/13 Claus Ibsen <cl...@gmail.com>

> Hi Jim, others
>
> I got some code for supporting exclusive replyTo queues. I just want
> to know if you run in a clustered environment.
> As the replyTo queue would be exclusive to each CamelContext (eg each
> node). So if you have 2 nodes running with Camel then each node would
> have to use an unique replyTo queue.
>
> If you run in a single instance of CamelContext then there is of
> course no problem.
>
> Alternative you can always use shared queues in a cluster, as each
> CamelContext (each node) will only pickup intended reply message for
> it (using a JMS selector), which is also the case why its slower. You
> can tweak the performance with the receiveTimeout. That is default 1
> sec. If you set it lower, then it will potential react faster to new
> replies coming back.
>
>
>
> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jn...@referentia.com>
> wrote:
> >
> > I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
> > specify a custom replyTo destination on the endpoint url, the time it
> takes
> > for the producer to receive a reply increases drastically.  The curious
> > thing is that the time to receive a reply is almost exactly 1 second.
>  When
> > I remove the replyTo from the url, everything's fast again.
> >
> > I created a very simple, stand-alone test to demonstrate what I'm seeing.
> >  There is a server class [4] which runs an embedded instance of ActiveMQ
> and
> > simply replies to messages as they arrive; and a client [3] class which
> > simply sends messages to the server, and prints the elapsed time.  The
> > USE_REPLY_TO symbolic constant in the client determines whether a replyTo
> > value is added to the url or not.
> >
> > The client output when USE_REPLY_TO is false is shown as [1].  The client
> > output when USE_REPLY_TO is true is shown as [2].  The code is pretty
> > trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
> > issue?
> >
> > Thanks!
> > Jim
> >
> >
> > [1] USE_REPLY_TO = false
> >
> > received reply in: 0.476 s
> > received reply in: 0.006 s
> > received reply in: 0.006 s
> > received reply in: 0.006 s
> > received reply in: 0.006 s
> > ...
> >
> >
> > [2] USE_REPLY_TO = true
> >
> > received reply in: 1.524 s
> > received reply in: 1.002 s
> > received reply in: 1.003 s
> > received reply in: 1.003 s
> > received reply in: 1.002 s
> > ...
> >
> >
> > [3] TestReplyToClient.java
> >
> > package test;
> >
> > import org.apache.activemq.ActiveMQConnectionFactory;
> > import org.apache.activemq.camel.component.ActiveMQComponent;
> > import org.apache.camel.CamelContext;
> > import org.apache.camel.ProducerTemplate;
> > import org.apache.camel.impl.DefaultCamelContext;
> >
> > public class TestReplyToClient {
> >
> >  private static final boolean USE_REPLY_TO = false;
> >
> >  public static void main(String... args) throws Exception {
> >    // create camel context; configure activemq component for
> > tcp://localhost:7001
> >    CamelContext context = new DefaultCamelContext();
> >    ActiveMQComponent activemqComponent =
> > ActiveMQComponent.activeMQComponent();
> >    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
> >      null, null, "tcp://localhost:7001"));
> >    context.addComponent("activemq", activemqComponent);
> >    context.start();
> >
> >    // define url to send requests to
> >    String sendUrl = "activemq:queue:dest";
> >    if (USE_REPLY_TO) {
> >      sendUrl += "?replyTo=replyQueue";
> >    }
> >    System.err.println("sending to url: " + sendUrl);
> >
> >    // repeatedly send requests; measure elapsed time
> >    ProducerTemplate template = context.createProducerTemplate();
> >    while (true) {
> >      long startNanos = System.nanoTime();
> >      template.requestBody(sendUrl, "abc");
> >      long elapsedNanos = System.nanoTime() - startNanos;
> >      System.err.println(String.format("received reply in: %.3f s",
> > elapsedNanos / 1000000000.0));
> >    }
> >  }
> >
> > }
> >
> >
> > [4] TestReplyToServer.java
> >
> > package test;
> >
> > import org.apache.activemq.broker.BrokerService;
> > import org.apache.activemq.camel.component.ActiveMQComponent;
> > import org.apache.camel.CamelContext;
> > import org.apache.camel.Exchange;
> > import org.apache.camel.Processor;
> > import org.apache.camel.builder.RouteBuilder;
> > import org.apache.camel.impl.DefaultCamelContext;
> >
> > public class TestReplyToServer {
> >
> >  private static final String BROKER_NAME = "thebroker";
> >
> >  public static void main(String... args) throws Exception {
> >    startBroker();
> >    startCamel();
> >    Thread.sleep(Long.MAX_VALUE);
> >  }
> >
> >  private static void startBroker() throws Exception {
> >    BrokerService brokerService = new BrokerService();
> >    brokerService.setBrokerName(BROKER_NAME);
> >    brokerService.setSchedulerSupport(false);
> >    brokerService.setPersistent(false);
> >    brokerService.addConnector("tcp://0.0.0.0:7001");
> >    brokerService.start();
> >    brokerService.waitUntilStarted();
> >  }
> >
> >
> >  private static void startCamel() throws Exception {
> >    CamelContext context = new DefaultCamelContext();
> >
> >    ActiveMQComponent activemqComponent =
> > ActiveMQComponent.activeMQComponent();
> >    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
> > BROKER_NAME));
> >    context.addComponent("activemq", activemqComponent);
> >
> >    final String receiveUrl = "activemq:queue:dest";
> >    context.addRoutes(new RouteBuilder() {
> >      @Override
> >      public void configure() throws Exception {
> >        from(receiveUrl).process(new Processor() {
> >          @Override
> >          public void process(Exchange exchange) throws Exception {
> >            System.err.println("received request");
> >            exchange.getOut().setBody("reply");
> >          }
> >        });
> >      }
> >    });
> >
> >    context.start();
> >    System.err.println("listening on url: " + receiveUrl);
> >  }
> >
> > }
> >
> >
> >
> >
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>



-- 
--
David J. M. Karlsen - http://www.linkedin.com/in/davidkarlsen

Re: slow reply for jms component when url contains replyTo

Posted by Claus Ibsen <cl...@gmail.com>.
Hi Jim, others

I got some code for supporting exclusive replyTo queues. I just want
to know if you run in a clustered environment.
As the replyTo queue would be exclusive to each CamelContext (eg each
node). So if you have 2 nodes running with Camel then each node would
have to use an unique replyTo queue.

If you run in a single instance of CamelContext then there is of
course no problem.

Alternative you can always use shared queues in a cluster, as each
CamelContext (each node) will only pickup intended reply message for
it (using a JMS selector), which is also the case why its slower. You
can tweak the performance with the receiveTimeout. That is default 1
sec. If you set it lower, then it will potential react faster to new
replies coming back.



On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jn...@referentia.com> wrote:
>
> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
> specify a custom replyTo destination on the endpoint url, the time it takes
> for the producer to receive a reply increases drastically.  The curious
> thing is that the time to receive a reply is almost exactly 1 second.  When
> I remove the replyTo from the url, everything's fast again.
>
> I created a very simple, stand-alone test to demonstrate what I'm seeing.
>  There is a server class [4] which runs an embedded instance of ActiveMQ and
> simply replies to messages as they arrive; and a client [3] class which
> simply sends messages to the server, and prints the elapsed time.  The
> USE_REPLY_TO symbolic constant in the client determines whether a replyTo
> value is added to the url or not.
>
> The client output when USE_REPLY_TO is false is shown as [1].  The client
> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
> trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
> issue?
>
> Thanks!
> Jim
>
>
> [1] USE_REPLY_TO = false
>
> received reply in: 0.476 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> ...
>
>
> [2] USE_REPLY_TO = true
>
> received reply in: 1.524 s
> received reply in: 1.002 s
> received reply in: 1.003 s
> received reply in: 1.003 s
> received reply in: 1.002 s
> ...
>
>
> [3] TestReplyToClient.java
>
> package test;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.CamelContext;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.impl.DefaultCamelContext;
>
> public class TestReplyToClient {
>
>  private static final boolean USE_REPLY_TO = false;
>
>  public static void main(String... args) throws Exception {
>    // create camel context; configure activemq component for
> tcp://localhost:7001
>    CamelContext context = new DefaultCamelContext();
>    ActiveMQComponent activemqComponent =
> ActiveMQComponent.activeMQComponent();
>    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>      null, null, "tcp://localhost:7001"));
>    context.addComponent("activemq", activemqComponent);
>    context.start();
>
>    // define url to send requests to
>    String sendUrl = "activemq:queue:dest";
>    if (USE_REPLY_TO) {
>      sendUrl += "?replyTo=replyQueue";
>    }
>    System.err.println("sending to url: " + sendUrl);
>
>    // repeatedly send requests; measure elapsed time
>    ProducerTemplate template = context.createProducerTemplate();
>    while (true) {
>      long startNanos = System.nanoTime();
>      template.requestBody(sendUrl, "abc");
>      long elapsedNanos = System.nanoTime() - startNanos;
>      System.err.println(String.format("received reply in: %.3f s",
> elapsedNanos / 1000000000.0));
>    }
>  }
>
> }
>
>
> [4] TestReplyToServer.java
>
> package test;
>
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
>
> public class TestReplyToServer {
>
>  private static final String BROKER_NAME = "thebroker";
>
>  public static void main(String... args) throws Exception {
>    startBroker();
>    startCamel();
>    Thread.sleep(Long.MAX_VALUE);
>  }
>
>  private static void startBroker() throws Exception {
>    BrokerService brokerService = new BrokerService();
>    brokerService.setBrokerName(BROKER_NAME);
>    brokerService.setSchedulerSupport(false);
>    brokerService.setPersistent(false);
>    brokerService.addConnector("tcp://0.0.0.0:7001");
>    brokerService.start();
>    brokerService.waitUntilStarted();
>  }
>
>
>  private static void startCamel() throws Exception {
>    CamelContext context = new DefaultCamelContext();
>
>    ActiveMQComponent activemqComponent =
> ActiveMQComponent.activeMQComponent();
>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
> BROKER_NAME));
>    context.addComponent("activemq", activemqComponent);
>
>    final String receiveUrl = "activemq:queue:dest";
>    context.addRoutes(new RouteBuilder() {
>      @Override
>      public void configure() throws Exception {
>        from(receiveUrl).process(new Processor() {
>          @Override
>          public void process(Exchange exchange) throws Exception {
>            System.err.println("received request");
>            exchange.getOut().setBody("reply");
>          }
>        });
>      }
>    });
>
>    context.start();
>    System.err.println("listening on url: " + receiveUrl);
>  }
>
> }
>
>
>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: slow reply for jms component when url contains replyTo

Posted by Claus Ibsen <cl...@gmail.com>.
On Fri, Jul 8, 2011 at 11:04 AM, Sander Mak <sa...@gmail.com> wrote:
> Could this be caused by the polling nature of the underlying Spring
> DefaultMessageListenerContainer class (which has 1 second intervals by
> default IIRC). However, I don't know how the internals of reply-to
> work and whether it uses the listenercontainer.
>

Yes the ReplyManager in camel-jms uses the spring listener container
to listen/poll replies.


>
> Sander
>
> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jn...@referentia.com> wrote:
>>
>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
>> specify a custom replyTo destination on the endpoint url, the time it takes
>> for the producer to receive a reply increases drastically.  The curious
>> thing is that the time to receive a reply is almost exactly 1 second.  When
>> I remove the replyTo from the url, everything's fast again.
>>
>> I created a very simple, stand-alone test to demonstrate what I'm seeing.
>>  There is a server class [4] which runs an embedded instance of ActiveMQ and
>> simply replies to messages as they arrive; and a client [3] class which
>> simply sends messages to the server, and prints the elapsed time.  The
>> USE_REPLY_TO symbolic constant in the client determines whether a replyTo
>> value is added to the url or not.
>>
>> The client output when USE_REPLY_TO is false is shown as [1].  The client
>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>> trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
>> issue?
>>
>> Thanks!
>> Jim
>>
>>
>> [1] USE_REPLY_TO = false
>>
>> received reply in: 0.476 s
>> received reply in: 0.006 s
>> received reply in: 0.006 s
>> received reply in: 0.006 s
>> received reply in: 0.006 s
>> ...
>>
>>
>> [2] USE_REPLY_TO = true
>>
>> received reply in: 1.524 s
>> received reply in: 1.002 s
>> received reply in: 1.003 s
>> received reply in: 1.003 s
>> received reply in: 1.002 s
>> ...
>>
>>
>> [3] TestReplyToClient.java
>>
>> package test;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.ProducerTemplate;
>> import org.apache.camel.impl.DefaultCamelContext;
>>
>> public class TestReplyToClient {
>>
>>  private static final boolean USE_REPLY_TO = false;
>>
>>  public static void main(String... args) throws Exception {
>>    // create camel context; configure activemq component for
>> tcp://localhost:7001
>>    CamelContext context = new DefaultCamelContext();
>>    ActiveMQComponent activemqComponent =
>> ActiveMQComponent.activeMQComponent();
>>    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>>      null, null, "tcp://localhost:7001"));
>>    context.addComponent("activemq", activemqComponent);
>>    context.start();
>>
>>    // define url to send requests to
>>    String sendUrl = "activemq:queue:dest";
>>    if (USE_REPLY_TO) {
>>      sendUrl += "?replyTo=replyQueue";
>>    }
>>    System.err.println("sending to url: " + sendUrl);
>>
>>    // repeatedly send requests; measure elapsed time
>>    ProducerTemplate template = context.createProducerTemplate();
>>    while (true) {
>>      long startNanos = System.nanoTime();
>>      template.requestBody(sendUrl, "abc");
>>      long elapsedNanos = System.nanoTime() - startNanos;
>>      System.err.println(String.format("received reply in: %.3f s",
>> elapsedNanos / 1000000000.0));
>>    }
>>  }
>>
>> }
>>
>>
>> [4] TestReplyToServer.java
>>
>> package test;
>>
>> import org.apache.activemq.broker.BrokerService;
>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.impl.DefaultCamelContext;
>>
>> public class TestReplyToServer {
>>
>>  private static final String BROKER_NAME = "thebroker";
>>
>>  public static void main(String... args) throws Exception {
>>    startBroker();
>>    startCamel();
>>    Thread.sleep(Long.MAX_VALUE);
>>  }
>>
>>  private static void startBroker() throws Exception {
>>    BrokerService brokerService = new BrokerService();
>>    brokerService.setBrokerName(BROKER_NAME);
>>    brokerService.setSchedulerSupport(false);
>>    brokerService.setPersistent(false);
>>    brokerService.addConnector("tcp://0.0.0.0:7001");
>>    brokerService.start();
>>    brokerService.waitUntilStarted();
>>  }
>>
>>
>>  private static void startCamel() throws Exception {
>>    CamelContext context = new DefaultCamelContext();
>>
>>    ActiveMQComponent activemqComponent =
>> ActiveMQComponent.activeMQComponent();
>>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>> BROKER_NAME));
>>    context.addComponent("activemq", activemqComponent);
>>
>>    final String receiveUrl = "activemq:queue:dest";
>>    context.addRoutes(new RouteBuilder() {
>>      @Override
>>      public void configure() throws Exception {
>>        from(receiveUrl).process(new Processor() {
>>          @Override
>>          public void process(Exchange exchange) throws Exception {
>>            System.err.println("received request");
>>            exchange.getOut().setBody("reply");
>>          }
>>        });
>>      }
>>    });
>>
>>    context.start();
>>    System.err.println("listening on url: " + receiveUrl);
>>  }
>>
>> }
>>
>>
>>
>>
>>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: slow reply for jms component when url contains replyTo

Posted by Sander Mak <sa...@gmail.com>.
Could this be caused by the polling nature of the underlying Spring
DefaultMessageListenerContainer class (which has 1 second intervals by
default IIRC). However, I don't know how the internals of reply-to
work and whether it uses the listenercontainer.


Sander

On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jn...@referentia.com> wrote:
>
> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
> specify a custom replyTo destination on the endpoint url, the time it takes
> for the producer to receive a reply increases drastically.  The curious
> thing is that the time to receive a reply is almost exactly 1 second.  When
> I remove the replyTo from the url, everything's fast again.
>
> I created a very simple, stand-alone test to demonstrate what I'm seeing.
>  There is a server class [4] which runs an embedded instance of ActiveMQ and
> simply replies to messages as they arrive; and a client [3] class which
> simply sends messages to the server, and prints the elapsed time.  The
> USE_REPLY_TO symbolic constant in the client determines whether a replyTo
> value is added to the url or not.
>
> The client output when USE_REPLY_TO is false is shown as [1].  The client
> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
> trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
> issue?
>
> Thanks!
> Jim
>
>
> [1] USE_REPLY_TO = false
>
> received reply in: 0.476 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> received reply in: 0.006 s
> ...
>
>
> [2] USE_REPLY_TO = true
>
> received reply in: 1.524 s
> received reply in: 1.002 s
> received reply in: 1.003 s
> received reply in: 1.003 s
> received reply in: 1.002 s
> ...
>
>
> [3] TestReplyToClient.java
>
> package test;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.CamelContext;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.impl.DefaultCamelContext;
>
> public class TestReplyToClient {
>
>  private static final boolean USE_REPLY_TO = false;
>
>  public static void main(String... args) throws Exception {
>    // create camel context; configure activemq component for
> tcp://localhost:7001
>    CamelContext context = new DefaultCamelContext();
>    ActiveMQComponent activemqComponent =
> ActiveMQComponent.activeMQComponent();
>    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>      null, null, "tcp://localhost:7001"));
>    context.addComponent("activemq", activemqComponent);
>    context.start();
>
>    // define url to send requests to
>    String sendUrl = "activemq:queue:dest";
>    if (USE_REPLY_TO) {
>      sendUrl += "?replyTo=replyQueue";
>    }
>    System.err.println("sending to url: " + sendUrl);
>
>    // repeatedly send requests; measure elapsed time
>    ProducerTemplate template = context.createProducerTemplate();
>    while (true) {
>      long startNanos = System.nanoTime();
>      template.requestBody(sendUrl, "abc");
>      long elapsedNanos = System.nanoTime() - startNanos;
>      System.err.println(String.format("received reply in: %.3f s",
> elapsedNanos / 1000000000.0));
>    }
>  }
>
> }
>
>
> [4] TestReplyToServer.java
>
> package test;
>
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
>
> public class TestReplyToServer {
>
>  private static final String BROKER_NAME = "thebroker";
>
>  public static void main(String... args) throws Exception {
>    startBroker();
>    startCamel();
>    Thread.sleep(Long.MAX_VALUE);
>  }
>
>  private static void startBroker() throws Exception {
>    BrokerService brokerService = new BrokerService();
>    brokerService.setBrokerName(BROKER_NAME);
>    brokerService.setSchedulerSupport(false);
>    brokerService.setPersistent(false);
>    brokerService.addConnector("tcp://0.0.0.0:7001");
>    brokerService.start();
>    brokerService.waitUntilStarted();
>  }
>
>
>  private static void startCamel() throws Exception {
>    CamelContext context = new DefaultCamelContext();
>
>    ActiveMQComponent activemqComponent =
> ActiveMQComponent.activeMQComponent();
>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
> BROKER_NAME));
>    context.addComponent("activemq", activemqComponent);
>
>    final String receiveUrl = "activemq:queue:dest";
>    context.addRoutes(new RouteBuilder() {
>      @Override
>      public void configure() throws Exception {
>        from(receiveUrl).process(new Processor() {
>          @Override
>          public void process(Exchange exchange) throws Exception {
>            System.err.println("received request");
>            exchange.getOut().setBody("reply");
>          }
>        });
>      }
>    });
>
>    context.start();
>    System.err.println("listening on url: " + receiveUrl);
>  }
>
> }
>
>
>
>
>