You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/02/27 11:26:13 UTC
svn commit: r748476 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/EndpointMessageListener.java
test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java
Author: davsclaus
Date: Fri Feb 27 10:26:13 2009
New Revision: 748476
URL: http://svn.apache.org/viewvc?rev=748476&view=rev
Log:
CAMEL-1366: JMSConsumer now support InOptionalOut so in case you do not have a response none will be sent back, and the original caller will timeout.
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java
- copied, changed from r748394, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=748476&r1=748475&r2=748476&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Fri Feb 27 10:26:13 2009
@@ -87,8 +87,9 @@
// process OK so get the reply
body = exchange.getOut(false);
}
- // send the reply
- if (rce == null && body != null && !disableReplyTo) {
+
+ // send the reply if we got a response and the exchange is out capable
+ if (rce == null && body != null && !disableReplyTo && exchange.getPattern().isOutCapable()) {
sendReply(replyDestination, message, exchange, body);
}
} catch (Exception e) {
@@ -105,7 +106,10 @@
// lets set to an InOut if we have some kind of reply-to destination
if (replyDestination != null && !disableReplyTo) {
exchange.setProperty(JmsConstants.JMS_REPLY_DESTINATION, replyDestination);
- exchange.setPattern(ExchangePattern.InOut);
+ // only change pattern if not already out capable
+ if (!exchange.getPattern().isOutCapable()) {
+ exchange.setPattern(ExchangePattern.InOut);
+ }
}
return exchange;
}
Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java (from r748394, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java&r1=748394&r2=748476&rev=748476&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestReplyTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java Fri Feb 27 10:26:13 2009
@@ -16,9 +16,9 @@
*/
package org.apache.camel.component.jms;
-import javax.jms.ConnectionFactory;
+import java.util.concurrent.CountDownLatch;
-import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
@@ -26,39 +26,77 @@
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
- * A simple requesr / reply test
+ * A simple requesr / late reply test using InOptionalOut.
*/
-public class JmsSimpleRequestReplyTest extends ContextTestSupport {
+public class JmsSimpleRequestLateReplyTest extends ContextTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(JmsSimpleRequestLateReplyTest.class);
protected String componentName = "activemq";
- public void testRequetReply() throws Exception {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private static String replyDestination;
+ private static String cid;
+
+ public void testRequetLateReply() throws Exception {
+ // use another thread to send the late reply to simulate that we do it later, not
+ // from the origianl route anyway
+ Thread sender = new Thread(new SendLateReply());
+ sender.start();
+
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(1);
- Exchange out = template.send("activemq:queue:hello", ExchangePattern.InOut, new Processor() {
+ Exchange out = template.request("activemq:queue:hello", new Processor() {
public void process(Exchange exchange) throws Exception {
+ // we expect a response so InOut
+ exchange.setPattern(ExchangePattern.InOut);
exchange.getIn().setBody("Hello World");
- exchange.getIn().setHeader("foo", 123);
}
});
result.assertIsSatisfied();
assertNotNull(out);
+ // TODO: We should get this late reply to work
+ //assertEquals("Late Reply", out.getOut().getBody());
+ }
- assertEquals("Bye World", out.getOut().getBody(String.class));
- assertEquals(123, out.getOut().getHeader("foo"));
+ private class SendLateReply implements Runnable {
+
+ public void run() {
+ try {
+ LOG.debug("Wating for latch");
+ latch.await();
+
+ // wait 1 sec after latch before sending he late replay
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ // ignore
+ }
+
+ LOG.debug("Sending late reply");
+ template.send(componentName + ":" + replyDestination, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.setPattern(ExchangePattern.InOnly);
+ exchange.getIn().setBody("Late reply");
+ exchange.getIn().setHeader("JMSCorrelationID", cid);
+ }
+ });
+ }
}
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
- camelContext.addComponent(componentName, jmsComponentClientAcknowledge(connectionFactory));
+ ActiveMQComponent amq = ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
+ // as this is a unit test I dont want to wait 20 sec before timeout occurs, so we use 10
+ amq.getConfiguration().setRequestTimeout(10000);
+ camelContext.addComponent(componentName, amq);
return camelContext;
}
@@ -66,11 +104,19 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- from("activemq:queue:hello").process(new Processor() {
+ // set the MEP to InOptionalOut as we might not be able to send a reply
+ from(componentName + ":queue:hello").setExchangePattern(ExchangePattern.InOptionalOut).process(new Processor() {
public void process(Exchange exchange) throws Exception {
- exchange.getIn().setBody("Bye World");
- // the reply destination is set as a property on the exchange while we process it
- assertNotNull(exchange.getProperty(JmsConstants.JMS_REPLY_DESTINATION));
+ assertEquals("Hello World", exchange.getIn().getBody());
+
+ replyDestination = exchange.getProperty(JmsConstants.JMS_REPLY_DESTINATION, String.class);
+ cid = exchange.getIn().getHeader("JMSCorrelationID", String.class);
+
+ LOG.debug("ReplyDestination: " + replyDestination);
+ LOG.debug("JMSCorrelationID: " + cid);
+
+ LOG.debug("Ahh I cannot send a reply. Someone else must do it.");
+ latch.countDown();
}
}).to("mock:result");
}