You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2009/02/27 13:21:39 UTC

svn commit: r748494 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/

Author: jstrachan
Date: Fri Feb 27 12:21:39 2009
New Revision: 748494

URL: http://svn.apache.org/viewvc?rev=748494&view=rev
Log:
fix for CAMEL-1405 so we can easily create JMS endpoints from incoming Destination objects, or send to arbitrary destination objects as well as fixing Claus's test case

Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java Fri Feb 27 12:21:39 2009
@@ -21,7 +21,8 @@
  */
 public final class JmsConstants {
 
-    public static final String JMS_REPLY_DESTINATION = "CamelJmsReplyDestination";
+    public static final String JMS_REPLY_DESTINATION = "JMSReplyTo"; //"CamelJmsReplyDestination";
+    public static final String JMS_DESTINATION = "JMSDestination";
 
     private JmsConstants() {
         // utility class

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Fri Feb 27 12:21:39 2009
@@ -22,6 +22,9 @@
 import javax.jms.Message;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.Queue;
 
 import org.apache.camel.Component;
 import org.apache.camel.Exchange;
@@ -55,10 +58,44 @@
     private JmsConfiguration configuration;
     private Requestor requestor;
 
+    /**
+     * Returns a new JMS endpoint for the given JMS destination using the configuration from the given JMS component
+     */
+    public static JmsEndpoint newInstance(Destination destination, JmsComponent component) throws JMSException {
+        JmsEndpoint answer = newInstance(destination);
+        JmsConfiguration newConfiguration = component.getConfiguration().copy();
+        answer.setConfiguration(newConfiguration);
+        answer.setCamelContext(component.getCamelContext());
+        return answer;
+    }
+
+    /**
+     * Returns a new JMS endpoint for the given JMS destination
+     */
+    public static JmsEndpoint newInstance(Destination destination) throws JMSException {
+        if (destination instanceof TemporaryQueue) {
+            return new JmsTemporaryQueueEndpoint((TemporaryQueue) destination);
+        }
+        if (destination instanceof TemporaryTopic) {
+            return new JmsTemporaryTopicEndpoint((TemporaryTopic) destination);
+        }
+        if (destination instanceof Queue) {
+            return new JmsQueueEndpoint((Queue) destination);
+        }
+        else {
+            return new JmsEndpoint((Topic) destination);
+        }
+    }
+
     public JmsEndpoint() {
         this(null, null);
     }
 
+    public JmsEndpoint(Topic destination) throws JMSException {
+        this("jms:topic:" + destination.getTopicName(), null);
+        this.destination = destination;
+    }
+
     public JmsEndpoint(String uri, JmsComponent component, String destinationName, boolean pubSubDomain, JmsConfiguration configuration) {
         super(uri, component);
         this.configuration = configuration;
@@ -746,4 +783,5 @@
         }
         return super.createEndpointUri();
     }
+
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Fri Feb 27 12:21:39 2009
@@ -147,7 +147,10 @@
         final org.apache.camel.Message in = exchange.getIn();
 
         String destinationName = endpoint.getDestinationName();
-        Destination destination = endpoint.getDestination();
+        Destination destination = exchange.getProperty(JmsConstants.JMS_DESTINATION, Destination.class);
+        if (destination == null) {
+            destination = endpoint.getDestination();
+        }
         if (exchange.getPattern().isOutCapable()) {
 
             testAndSetRequestor();
@@ -245,11 +248,11 @@
                     return message;
                 }
             };
-            if (destinationName != null) {
-                getInOnlyTemplate().send(destinationName, messageCreator);
-            } else if (destination != null) {
+            if (destination != null) {
                 getInOnlyTemplate().send(destination, messageCreator);
-            } else {
+            } else if (destinationName != null) {
+                getInOnlyTemplate().send(destinationName, messageCreator);
+            } else  {
                 throw new IllegalArgumentException("Neither destination nor destinationName is specified on this endpoint: " + endpoint);
             }
 

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java Fri Feb 27 12:21:39 2009
@@ -25,6 +25,10 @@
 import org.apache.commons.logging.LogFactory;
 import org.springframework.jms.core.JmsOperations;
 
+import javax.jms.Topic;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
 /**
  * An endpoint for a JMS Queue which is also browsable
  *
@@ -36,6 +40,11 @@
     private int maximumBrowseSize = -1;
     private final QueueBrowseStrategy queueBrowseStrategy;
 
+    public JmsQueueEndpoint(Queue destination) throws JMSException {
+        this("jms:queue:" + destination.getQueueName(), null);
+        setDestination(destination);
+    }
+    
     public JmsQueueEndpoint(String uri, JmsComponent component, String destination,
             JmsConfiguration configuration) {
         this(uri, component, destination, configuration, null);

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java Fri Feb 27 12:21:39 2009
@@ -43,6 +43,13 @@
         super(endpointUri, destination);
     }
 
+    public JmsTemporaryQueueEndpoint(TemporaryQueue jmsDestination) throws JMSException {
+        super("jms:temp:queue:" + jmsDestination.getQueueName(), null);
+        this.jmsDestination = jmsDestination;
+        setDestination(jmsDestination);
+    }
+
+
     /**
      * This endpoint is a singleton so that the temporary destination instances are shared across all
      * producers and consumers of the same endpoint URI

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java Fri Feb 27 12:21:39 2009
@@ -39,6 +39,12 @@
         super(endpointUri, destination);
     }
 
+    public JmsTemporaryTopicEndpoint(TemporaryTopic jmsDestination) throws JMSException {
+        super("jms:temp:topic:" + jmsDestination.getTopicName(), null);
+        this.jmsDestination = jmsDestination;
+        setDestination(jmsDestination);
+    }
+
     /**
      * This endpoint is a singleton so that the temporary destination instances are shared across all
      * producers and consumers of the same endpoint URI

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java Fri Feb 27 12:21:39 2009
@@ -24,11 +24,16 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
+import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+
 /**
  * A simple requesr / late reply test using InOptionalOut.
  */
@@ -36,22 +41,35 @@
 
     private static final Log LOG = LogFactory.getLog(JmsSimpleRequestLateReplyTest.class);
 
-    protected String componentName = "activemq";
-
+    protected String expectedBody = "Late Reply";
+             
     private final CountDownLatch latch = new CountDownLatch(1);
-    private static String replyDestination;
+    private static Destination replyDestination;
     private static String cid;
+    protected ActiveMQComponent activeMQComponent;
+
+    public void testRequestLateReplyUsingCustomDestinationHeaderForReply() throws Exception {
+        Runnable runnable = new SendLateReply();
+        doTest(runnable);
+
+    }
+    
+    public void testRequestLateReplyUsingDestinationEndpointForReply() throws Exception {
+        // use another thread to send the late reply to simulate that we do it later, not
+        // from the origianl route anyway
+        doTest(new SendLateReplyUsingTemporaryEndpoint());
+    }
 
-    public void testRequetLateReply() throws Exception {
+    protected void doTest(Runnable runnable) throws InterruptedException {
         // use another thread to send the late reply to simulate that we do it later, not
         // from the origianl route anyway
-        Thread sender = new Thread(new SendLateReply());
+        Thread sender = new Thread(runnable);
         sender.start();
 
         MockEndpoint result = getMockEndpoint("mock:result");
         result.expectedMessageCount(1);
 
-        Exchange out = template.request("activemq:queue:hello", new Processor() {
+        Exchange out = template.request(getQueueEndpointName(), new Processor() {
             public void process(Exchange exchange) throws Exception {
                 // we expect a response so InOut
                 exchange.setPattern(ExchangePattern.InOut);
@@ -62,8 +80,7 @@
         result.assertIsSatisfied();
 
         assertNotNull(out);
-        // TODO: We should get this late reply to work
-        //assertEquals("Late Reply", out.getOut().getBody());
+        assertEquals(expectedBody, out.getOut().getBody());
     }
 
     private class SendLateReply implements Runnable {
@@ -80,23 +97,61 @@
             }
 
             LOG.debug("Sending late reply");
-            template.send(componentName + ":" + replyDestination, new Processor() {
+            template.send("activemq:dummy", new Processor() {
                 public void process(Exchange exchange) throws Exception {
                     exchange.setPattern(ExchangePattern.InOnly);
-                    exchange.getIn().setBody("Late reply");
-                    exchange.getIn().setHeader("JMSCorrelationID", cid);
+                    exchange.setProperty(JmsConstants.JMS_DESTINATION, replyDestination);
+                    
+                    Message in = exchange.getIn();
+                    in.setBody(expectedBody);
+                    in.setHeader("JMSCorrelationID", cid);
                 }
             });
         }
     }
 
+    private class SendLateReplyUsingTemporaryEndpoint implements Runnable {
+
+        public void run() {
+            try {
+                LOG.debug("Wating for latch");
+                latch.await();
+
+                // wait 1 sec after latch before sending he late replay
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                // ignore
+            }
+
+            LOG.debug("Sending late reply");
+
+            try {
+                JmsEndpoint endpoint = JmsEndpoint.newInstance(replyDestination, activeMQComponent);
+
+                template.send(endpoint, new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        exchange.setPattern(ExchangePattern.InOnly);
+
+                        Message in = exchange.getIn();
+                        in.setBody(expectedBody);
+                        in.setHeader("JMSCorrelationID", cid);
+                    }
+                });
+
+            } catch (JMSException e) {
+                LOG.error("Failed to create the endpoint for " + replyDestination);
+            }
+        }
+    }
+
+
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
 
-        ActiveMQComponent amq = ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
+        activeMQComponent = ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
         // as this is a unit test I dont want to wait 20 sec before timeout occurs, so we use 10
-        amq.getConfiguration().setRequestTimeout(10000);
-        camelContext.addComponent(componentName, amq);
+        activeMQComponent.getConfiguration().setRequestTimeout(10000);
+        camelContext.addComponent("activemq", activeMQComponent);
 
         return camelContext;
     }
@@ -105,12 +160,13 @@
         return new RouteBuilder() {
             public void configure() throws Exception {
                 // set the MEP to InOptionalOut as we might not be able to send a reply
-                from(componentName + ":queue:hello").setExchangePattern(ExchangePattern.InOptionalOut).process(new Processor() {
+                from(getQueueEndpointName()).setExchangePattern(ExchangePattern.InOptionalOut).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
-                        assertEquals("Hello World", exchange.getIn().getBody());
+                        Message in = exchange.getIn();
+                        assertEquals("Hello World", in.getBody());
 
-                        replyDestination = exchange.getProperty(JmsConstants.JMS_REPLY_DESTINATION, String.class);
-                        cid = exchange.getIn().getHeader("JMSCorrelationID", String.class);
+                        replyDestination = in.getHeader(JmsConstants.JMS_REPLY_DESTINATION, Destination.class);
+                        cid = in.getHeader("JMSCorrelationID", String.class);
 
                         LOG.debug("ReplyDestination: " + replyDestination);
                         LOG.debug("JMSCorrelationID: " + cid);
@@ -122,4 +178,11 @@
             }
         };
     }
+
+
+    protected String getQueueEndpointName() {
+        // lets use a different queue name for each test
+        return "activemq:queue:hello." + getName();
+    }
+
 }
\ No newline at end of file