You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/02/27 11:26:13 UTC

svn commit: r748476 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/EndpointMessageListener.java test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java

Author: davsclaus
Date: Fri Feb 27 10:26:13 2009
New Revision: 748476

URL: http://svn.apache.org/viewvc?rev=748476&view=rev
Log:
CAMEL-1366: JMSConsumer now support InOptionalOut so in case you do not have a response none will be sent back, and the original caller will timeout.

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java
      - copied, changed from r748394, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=748476&r1=748475&r2=748476&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Fri Feb 27 10:26:13 2009
@@ -87,8 +87,9 @@
                 // process OK so get the reply
                 body = exchange.getOut(false);
             }
-            // send the reply
-            if (rce == null && body != null && !disableReplyTo) {
+
+            // send the reply if we got a response and the exchange is out capable
+            if (rce == null && body != null && !disableReplyTo && exchange.getPattern().isOutCapable()) {
                 sendReply(replyDestination, message, exchange, body);
             }
         } catch (Exception e) {
@@ -105,7 +106,10 @@
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {
             exchange.setProperty(JmsConstants.JMS_REPLY_DESTINATION, replyDestination);
-            exchange.setPattern(ExchangePattern.InOut);
+            // only change pattern if not already out capable
+            if (!exchange.getPattern().isOutCapable()) {
+                exchange.setPattern(ExchangePattern.InOut);
+            }
         }
         return exchange;
     }

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java (from r748394, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java&r1=748394&r2=748476&rev=748476&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java Fri Feb 27 10:26:13 2009
@@ -16,9 +16,9 @@
  */
 package org.apache.camel.component.jms;
 
-import javax.jms.ConnectionFactory;
+import java.util.concurrent.CountDownLatch;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.camel.component.ActiveMQComponent;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
@@ -26,39 +26,77 @@
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
- * A simple requesr / reply test
+ * A simple requesr / late reply test using InOptionalOut.
  */
-public class JmsSimpleRequestReplyTest extends ContextTestSupport {
+public class JmsSimpleRequestLateReplyTest extends ContextTestSupport {
+
+    private static final Log LOG = LogFactory.getLog(JmsSimpleRequestLateReplyTest.class);
 
     protected String componentName = "activemq";
 
-    public void testRequetReply() throws Exception {
+    private final CountDownLatch latch = new CountDownLatch(1);
+    private static String replyDestination;
+    private static String cid;
+
+    public void testRequetLateReply() throws Exception {
+        // use another thread to send the late reply to simulate that we do it later, not
+        // from the origianl route anyway
+        Thread sender = new Thread(new SendLateReply());
+        sender.start();
+
         MockEndpoint result = getMockEndpoint("mock:result");
         result.expectedMessageCount(1);
 
-        Exchange out = template.send("activemq:queue:hello", ExchangePattern.InOut, new Processor() {
+        Exchange out = template.request("activemq:queue:hello", new Processor() {
             public void process(Exchange exchange) throws Exception {
+                // we expect a response so InOut
+                exchange.setPattern(ExchangePattern.InOut);
                 exchange.getIn().setBody("Hello World");
-                exchange.getIn().setHeader("foo", 123);
             }
         });
 
         result.assertIsSatisfied();
 
         assertNotNull(out);
+        // TODO: We should get this late reply to work
+        //assertEquals("Late Reply", out.getOut().getBody());
+    }
 
-        assertEquals("Bye World", out.getOut().getBody(String.class));
-        assertEquals(123, out.getOut().getHeader("foo"));
+    private class SendLateReply implements Runnable {
+
+        public void run() {
+            try {
+                LOG.debug("Wating for latch");
+                latch.await();
+
+                // wait 1 sec after latch before sending he late replay
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                // ignore
+            }
+
+            LOG.debug("Sending late reply");
+            template.send(componentName + ":" + replyDestination, new Processor() {
+                public void process(Exchange exchange) throws Exception {
+                    exchange.setPattern(ExchangePattern.InOnly);
+                    exchange.getIn().setBody("Late reply");
+                    exchange.getIn().setHeader("JMSCorrelationID", cid);
+                }
+            });
+        }
     }
 
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
 
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
-        camelContext.addComponent(componentName, jmsComponentClientAcknowledge(connectionFactory));
+        ActiveMQComponent amq = ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
+        // as this is a unit test I dont want to wait 20 sec before timeout occurs, so we use 10
+        amq.getConfiguration().setRequestTimeout(10000);
+        camelContext.addComponent(componentName, amq);
 
         return camelContext;
     }
@@ -66,11 +104,19 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("activemq:queue:hello").process(new Processor() {
+                // set the MEP to InOptionalOut as we might not be able to send a reply
+                from(componentName + ":queue:hello").setExchangePattern(ExchangePattern.InOptionalOut).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
-                        exchange.getIn().setBody("Bye World");
-                        // the reply destination is set as a property on the exchange while we process it
-                        assertNotNull(exchange.getProperty(JmsConstants.JMS_REPLY_DESTINATION));
+                        assertEquals("Hello World", exchange.getIn().getBody());
+
+                        replyDestination = exchange.getProperty(JmsConstants.JMS_REPLY_DESTINATION, String.class);
+                        cid = exchange.getIn().getHeader("JMSCorrelationID", String.class);
+
+                        LOG.debug("ReplyDestination: " + replyDestination);
+                        LOG.debug("JMSCorrelationID: " + cid);
+
+                        LOG.debug("Ahh I cannot send a reply. Someone else must do it.");
+                        latch.countDown();
                     }
                 }).to("mock:result");
             }