You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2009/02/27 13:21:39 UTC
svn commit: r748494 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/
test/java/org/apache/camel/component/jms/
Author: jstrachan
Date: Fri Feb 27 12:21:39 2009
New Revision: 748494
URL: http://svn.apache.org/viewvc?rev=748494&view=rev
Log:
fix for CAMEL-1405 so we can easily create JMS endpoints from incoming Destination objects, or send to arbitrary destination objects as well as fixing Claus's test case
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java Fri Feb 27 12:21:39 2009
@@ -21,7 +21,8 @@
*/
public final class JmsConstants {
- public static final String JMS_REPLY_DESTINATION = "CamelJmsReplyDestination";
+ public static final String JMS_REPLY_DESTINATION = "JMSReplyTo"; //"CamelJmsReplyDestination";
+ public static final String JMS_DESTINATION = "JMSDestination";
private JmsConstants() {
// utility class
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Fri Feb 27 12:21:39 2009
@@ -22,6 +22,9 @@
import javax.jms.Message;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.Queue;
import org.apache.camel.Component;
import org.apache.camel.Exchange;
@@ -55,10 +58,44 @@
private JmsConfiguration configuration;
private Requestor requestor;
+ /**
+ * Returns a new JMS endpoint for the given JMS destination using the configuration from the given JMS component
+ */
+ public static JmsEndpoint newInstance(Destination destination, JmsComponent component) throws JMSException {
+ JmsEndpoint answer = newInstance(destination);
+ JmsConfiguration newConfiguration = component.getConfiguration().copy();
+ answer.setConfiguration(newConfiguration);
+ answer.setCamelContext(component.getCamelContext());
+ return answer;
+ }
+
+ /**
+ * Returns a new JMS endpoint for the given JMS destination
+ */
+ public static JmsEndpoint newInstance(Destination destination) throws JMSException {
+ if (destination instanceof TemporaryQueue) {
+ return new JmsTemporaryQueueEndpoint((TemporaryQueue) destination);
+ }
+ if (destination instanceof TemporaryTopic) {
+ return new JmsTemporaryTopicEndpoint((TemporaryTopic) destination);
+ }
+ if (destination instanceof Queue) {
+ return new JmsQueueEndpoint((Queue) destination);
+ }
+ else {
+ return new JmsEndpoint((Topic) destination);
+ }
+ }
+
public JmsEndpoint() {
this(null, null);
}
+ public JmsEndpoint(Topic destination) throws JMSException {
+ this("jms:topic:" + destination.getTopicName(), null);
+ this.destination = destination;
+ }
+
public JmsEndpoint(String uri, JmsComponent component, String destinationName, boolean pubSubDomain, JmsConfiguration configuration) {
super(uri, component);
this.configuration = configuration;
@@ -746,4 +783,5 @@
}
return super.createEndpointUri();
}
+
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Fri Feb 27 12:21:39 2009
@@ -147,7 +147,10 @@
final org.apache.camel.Message in = exchange.getIn();
String destinationName = endpoint.getDestinationName();
- Destination destination = endpoint.getDestination();
+ Destination destination = exchange.getProperty(JmsConstants.JMS_DESTINATION, Destination.class);
+ if (destination == null) {
+ destination = endpoint.getDestination();
+ }
if (exchange.getPattern().isOutCapable()) {
testAndSetRequestor();
@@ -245,11 +248,11 @@
return message;
}
};
- if (destinationName != null) {
- getInOnlyTemplate().send(destinationName, messageCreator);
- } else if (destination != null) {
+ if (destination != null) {
getInOnlyTemplate().send(destination, messageCreator);
- } else {
+ } else if (destinationName != null) {
+ getInOnlyTemplate().send(destinationName, messageCreator);
+ } else {
throw new IllegalArgumentException("Neither destination nor destinationName is specified on this endpoint: " + endpoint);
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java Fri Feb 27 12:21:39 2009
@@ -25,6 +25,10 @@
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsOperations;
+import javax.jms.Topic;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
/**
* An endpoint for a JMS Queue which is also browsable
*
@@ -36,6 +40,11 @@
private int maximumBrowseSize = -1;
private final QueueBrowseStrategy queueBrowseStrategy;
+ public JmsQueueEndpoint(Queue destination) throws JMSException {
+ this("jms:queue:" + destination.getQueueName(), null);
+ setDestination(destination);
+ }
+
public JmsQueueEndpoint(String uri, JmsComponent component, String destination,
JmsConfiguration configuration) {
this(uri, component, destination, configuration, null);
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java Fri Feb 27 12:21:39 2009
@@ -43,6 +43,13 @@
super(endpointUri, destination);
}
+ public JmsTemporaryQueueEndpoint(TemporaryQueue jmsDestination) throws JMSException {
+ super("jms:temp:queue:" + jmsDestination.getQueueName(), null);
+ this.jmsDestination = jmsDestination;
+ setDestination(jmsDestination);
+ }
+
+
/**
* This endpoint is a singleton so that the temporary destination instances are shared across all
* producers and consumers of the same endpoint URI
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java Fri Feb 27 12:21:39 2009
@@ -39,6 +39,12 @@
super(endpointUri, destination);
}
+ public JmsTemporaryTopicEndpoint(TemporaryTopic jmsDestination) throws JMSException {
+ super("jms:temp:topic:" + jmsDestination.getTopicName(), null);
+ this.jmsDestination = jmsDestination;
+ setDestination(jmsDestination);
+ }
+
/**
* This endpoint is a singleton so that the temporary destination instances are shared across all
* producers and consumers of the same endpoint URI
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java?rev=748494&r1=748493&r2=748494&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java Fri Feb 27 12:21:39 2009
@@ -24,11 +24,16 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
+import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+
/**
* A simple requesr / late reply test using InOptionalOut.
*/
@@ -36,22 +41,35 @@
private static final Log LOG = LogFactory.getLog(JmsSimpleRequestLateReplyTest.class);
- protected String componentName = "activemq";
-
+ protected String expectedBody = "Late Reply";
+
private final CountDownLatch latch = new CountDownLatch(1);
- private static String replyDestination;
+ private static Destination replyDestination;
private static String cid;
+ protected ActiveMQComponent activeMQComponent;
+
+ public void testRequestLateReplyUsingCustomDestinationHeaderForReply() throws Exception {
+ Runnable runnable = new SendLateReply();
+ doTest(runnable);
+
+ }
+
+ public void testRequestLateReplyUsingDestinationEndpointForReply() throws Exception {
+ // use another thread to send the late reply to simulate that we do it later, not
+ // from the origianl route anyway
+ doTest(new SendLateReplyUsingTemporaryEndpoint());
+ }
- public void testRequetLateReply() throws Exception {
+ protected void doTest(Runnable runnable) throws InterruptedException {
// use another thread to send the late reply to simulate that we do it later, not
// from the origianl route anyway
- Thread sender = new Thread(new SendLateReply());
+ Thread sender = new Thread(runnable);
sender.start();
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(1);
- Exchange out = template.request("activemq:queue:hello", new Processor() {
+ Exchange out = template.request(getQueueEndpointName(), new Processor() {
public void process(Exchange exchange) throws Exception {
// we expect a response so InOut
exchange.setPattern(ExchangePattern.InOut);
@@ -62,8 +80,7 @@
result.assertIsSatisfied();
assertNotNull(out);
- // TODO: We should get this late reply to work
- //assertEquals("Late Reply", out.getOut().getBody());
+ assertEquals(expectedBody, out.getOut().getBody());
}
private class SendLateReply implements Runnable {
@@ -80,23 +97,61 @@
}
LOG.debug("Sending late reply");
- template.send(componentName + ":" + replyDestination, new Processor() {
+ template.send("activemq:dummy", new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.setPattern(ExchangePattern.InOnly);
- exchange.getIn().setBody("Late reply");
- exchange.getIn().setHeader("JMSCorrelationID", cid);
+ exchange.setProperty(JmsConstants.JMS_DESTINATION, replyDestination);
+
+ Message in = exchange.getIn();
+ in.setBody(expectedBody);
+ in.setHeader("JMSCorrelationID", cid);
}
});
}
}
+ private class SendLateReplyUsingTemporaryEndpoint implements Runnable {
+
+ public void run() {
+ try {
+ LOG.debug("Wating for latch");
+ latch.await();
+
+ // wait 1 sec after latch before sending he late replay
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ // ignore
+ }
+
+ LOG.debug("Sending late reply");
+
+ try {
+ JmsEndpoint endpoint = JmsEndpoint.newInstance(replyDestination, activeMQComponent);
+
+ template.send(endpoint, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.setPattern(ExchangePattern.InOnly);
+
+ Message in = exchange.getIn();
+ in.setBody(expectedBody);
+ in.setHeader("JMSCorrelationID", cid);
+ }
+ });
+
+ } catch (JMSException e) {
+ LOG.error("Failed to create the endpoint for " + replyDestination);
+ }
+ }
+ }
+
+
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
- ActiveMQComponent amq = ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
+ activeMQComponent = ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
// as this is a unit test I dont want to wait 20 sec before timeout occurs, so we use 10
- amq.getConfiguration().setRequestTimeout(10000);
- camelContext.addComponent(componentName, amq);
+ activeMQComponent.getConfiguration().setRequestTimeout(10000);
+ camelContext.addComponent("activemq", activeMQComponent);
return camelContext;
}
@@ -105,12 +160,13 @@
return new RouteBuilder() {
public void configure() throws Exception {
// set the MEP to InOptionalOut as we might not be able to send a reply
- from(componentName + ":queue:hello").setExchangePattern(ExchangePattern.InOptionalOut).process(new Processor() {
+ from(getQueueEndpointName()).setExchangePattern(ExchangePattern.InOptionalOut).process(new Processor() {
public void process(Exchange exchange) throws Exception {
- assertEquals("Hello World", exchange.getIn().getBody());
+ Message in = exchange.getIn();
+ assertEquals("Hello World", in.getBody());
- replyDestination = exchange.getProperty(JmsConstants.JMS_REPLY_DESTINATION, String.class);
- cid = exchange.getIn().getHeader("JMSCorrelationID", String.class);
+ replyDestination = in.getHeader(JmsConstants.JMS_REPLY_DESTINATION, Destination.class);
+ cid = in.getHeader("JMSCorrelationID", String.class);
LOG.debug("ReplyDestination: " + replyDestination);
LOG.debug("JMSCorrelationID: " + cid);
@@ -122,4 +178,11 @@
}
};
}
+
+
+ protected String getQueueEndpointName() {
+ // lets use a different queue name for each test
+ return "activemq:queue:hello." + getName();
+ }
+
}
\ No newline at end of file