You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/02/27 14:11:55 UTC
svn commit: r748503 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/
test/java/org/apache/camel/component/jms/
Author: davsclaus
Date: Fri Feb 27 13:11:55 2009
New Revision: 748503
URL: http://svn.apache.org/viewvc?rev=748503&view=rev
Log:
CAMEL-1405: Polished code a bit. Fixed checkstyle.
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestCustomReplyToTest.java
- copied, changed from r748488, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=748503&r1=748502&r2=748503&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Fri Feb 27 13:11:55 2009
@@ -33,7 +33,6 @@
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.JmsException;
-import org.springframework.jms.connection.JmsResourceHolder;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
@@ -52,7 +51,6 @@
import org.springframework.util.Assert;
import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
-
/**
* @version $Revision$
*/
@@ -146,7 +144,7 @@
*/
public JmsConfiguration copy() {
try {
- return (JmsConfiguration)clone();
+ return (JmsConfiguration) clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeCamelException(e);
}
@@ -171,31 +169,49 @@
execute(new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
Destination destination = resolveDestinationName(session, destinationName);
- Assert.notNull(messageCreator, "MessageCreator must not be null");
- MessageProducer producer = createProducer(session, destination);
- Message message = null;
- try {
- message = messageCreator.createMessage(session);
- if (logger.isDebugEnabled()) {
- logger.debug("Sending created message: " + message);
- }
- doSend(producer, message);
- // Check commit - avoid commit call within a JTA transaction.
- if (session.getTransacted() && isSessionLocallyTransacted(session)) {
- // Transacted session created by this template -> commit.
- JmsUtils.commitIfNecessary(session);
- }
- } finally {
- JmsUtils.closeMessageProducer(producer);
- }
- if (message != null && callback != null) {
- callback.sent(message);
- }
- return null;
+ return doSendToDestination(destination, messageCreator, callback, session);
}
}, false);
}
+ public void send(final Destination destination,
+ final MessageCreator messageCreator,
+ final MessageSentCallback callback) throws JmsException {
+ execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ return doSendToDestination(destination, messageCreator, callback, session);
+ }
+ }, false);
+ }
+
+ private Object doSendToDestination(final Destination destination,
+ final MessageCreator messageCreator,
+ final MessageSentCallback callback,
+ final Session session) throws JMSException {
+
+ Assert.notNull(messageCreator, "MessageCreator must not be null");
+ MessageProducer producer = createProducer(session, destination);
+ Message message = null;
+ try {
+ message = messageCreator.createMessage(session);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending created message: " + message);
+ }
+ doSend(producer, message);
+ // Check commit - avoid commit call within a JTA transaction.
+ if (session.getTransacted() && isSessionLocallyTransacted(session)) {
+ // Transacted session created by this template -> commit.
+ JmsUtils.commitIfNecessary(session);
+ }
+ } finally {
+ JmsUtils.closeMessageProducer(producer);
+ }
+ if (message != null && callback != null) {
+ callback.sent(message);
+ }
+ return null;
+ }
+
/**
* Override so we can support preserving the Qos settings that have
* been set on the message.
@@ -228,8 +244,8 @@
}
public void send(final String destinationName,
- final MessageCreator messageCreator,
- final MessageSentCallback callback) throws JmsException {
+ final MessageCreator messageCreator,
+ final MessageSentCallback callback) throws JmsException {
execute(new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
Destination destination = resolveDestinationName(session, destinationName);
@@ -278,10 +294,10 @@
}
if (isPubSubDomain()) {
((TopicPublisher) producer).publish(message, message.getJMSDeliveryMode(),
- message.getJMSPriority(), ttl);
+ message.getJMSPriority(), ttl);
} else {
((QueueSender) producer).send(message, message.getJMSDeliveryMode(),
- message.getJMSPriority(), ttl);
+ message.getJMSPriority(), ttl);
}
} else {
super.doSend(producer, message);
@@ -297,7 +313,7 @@
public JmsOperations createInOutTemplate(JmsEndpoint endpoint, boolean pubSubDomain, String destination, long requestTimeout) {
JmsOperations answer = createInOnlyTemplate(endpoint, pubSubDomain, destination);
if (answer instanceof JmsTemplate && requestTimeout > 0) {
- JmsTemplate jmsTemplate = (JmsTemplate)answer;
+ JmsTemplate jmsTemplate = (JmsTemplate) answer;
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setTimeToLive(requestTimeout);
jmsTemplate.setSessionTransacted(isTransactedInOut());
@@ -329,8 +345,8 @@
ConnectionFactory factory = getTemplateConnectionFactory();
JmsTemplate template = useVersion102
- ? new CamelJmsTeemplate102(this, factory, pubSubDomain)
- : new CamelJmsTemplate(this, factory);
+ ? new CamelJmsTeemplate102(this, factory, pubSubDomain)
+ : new CamelJmsTemplate(this, factory);
template.setPubSubDomain(pubSubDomain);
if (destinationResolver != null) {
@@ -417,7 +433,7 @@
* {@link #createMessageListenerContainer(JmsEndpoint)}
*
* @param listenerConnectionFactory the connection factory to use for
- * consuming messages
+ * consuming messages
*/
public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) {
this.listenerConnectionFactory = listenerConnectionFactory;
@@ -714,7 +730,7 @@
/**
* Should InOut operations (request reply) default to using transacted mode?
- *
+ * <p/>
* By default this is false as you need to commit the outgoing request before you can consume the input
*/
public boolean isTransactedInOut() {
@@ -724,11 +740,11 @@
public void setTransactedInOut(boolean transactedInOut) {
this.transactedInOut = transactedInOut;
}
-
+
public boolean isLazyCreateTransactionManager() {
return lazyCreateTransactionManager;
}
-
+
public void setLazyCreateTransactionManager(boolean lazyCreating) {
this.lazyCreateTransactionManager = lazyCreating;
}
@@ -744,7 +760,7 @@
* and the use of JMS properties
*
* @param eagerLoadingOfProperties whether or not to enable eager loading of
- * JMS properties on inbound messages
+ * JMS properties on inbound messages
*/
public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) {
this.eagerLoadingOfProperties = eagerLoadingOfProperties;
@@ -759,7 +775,7 @@
* messages are treated as InOnly rather than InOut requests.
*
* @param disableReplyTo whether or not to disable the use of JMSReplyTo
- * header indicating an InOut
+ * header indicating an InOut
*/
public void setDisableReplyTo(boolean disableReplyTo) {
this.disableReplyTo = disableReplyTo;
@@ -801,6 +817,7 @@
public void setRequestMapPurgePollTimeMillis(long requestMapPurgePollTimeMillis) {
this.requestMapPurgePollTimeMillis = requestMapPurgePollTimeMillis;
}
+
public JmsProviderMetadata getProviderMetadata() {
return providerMetadata;
}
@@ -893,7 +910,7 @@
if (container instanceof DefaultMessageListenerContainer) {
// this includes DefaultMessageListenerContainer102
- DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer)container;
+ DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer) container;
if (concurrentConsumers >= 0) {
listenerContainer.setConcurrentConsumers(concurrentConsumers);
}
@@ -939,7 +956,7 @@
}
} else if (container instanceof SimpleMessageListenerContainer) {
// this includes SimpleMessageListenerContainer102
- SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)container;
+ SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer) container;
if (concurrentConsumers >= 0) {
listenerContainer.setConcurrentConsumers(concurrentConsumers);
}
@@ -963,7 +980,7 @@
// independently configured
JmsOperations operations = listener.getTemplate();
if (operations instanceof JmsTemplate) {
- JmsTemplate template = (JmsTemplate)operations;
+ JmsTemplate template = (JmsTemplate) operations;
template.setDeliveryPersistent(isReplyToDeliveryPersistent());
}
}
@@ -1038,9 +1055,9 @@
protected ConnectionFactory createTemplateConnectionFactory() {
return getConnectionFactory();
}
-
+
/**
- * Factory method which which allows derived classes to customize the lazy
+ * Factory method which which allows derived classes to customize the lazy
* transcationManager creation
*/
protected PlatformTransactionManager createTransactionManager() {
@@ -1108,10 +1125,10 @@
public void setReplyTo(String replyToDestination) {
if (!replyToDestination.startsWith(QUEUE_PREFIX)) {
throw new IllegalArgumentException("ReplyTo destination value has to be of type queue; "
- + "e.g: \"queue:replyQueue\"");
+ + "e.g: \"queue:replyQueue\"");
}
this.replyToDestination =
- removeStartingCharacters(replyToDestination.substring(QUEUE_PREFIX.length()), '/');
+ removeStartingCharacters(replyToDestination.substring(QUEUE_PREFIX.length()), '/');
}
public String getReplyToDestinationSelectorName() {
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java?rev=748503&r1=748502&r2=748503&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java Fri Feb 27 13:11:55 2009
@@ -21,7 +21,8 @@
*/
public final class JmsConstants {
- public static final String JMS_REPLY_DESTINATION = "JMSReplyTo"; //"CamelJmsReplyDestination";
+ public static final String JMS_REPLY_DESTINATION = "JMSReplyTo";
+
public static final String JMS_DESTINATION = "JMSDestination";
private JmsConstants() {
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=748503&r1=748502&r2=748503&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Fri Feb 27 13:11:55 2009
@@ -19,12 +19,12 @@
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
-import javax.jms.JMSException;
import javax.jms.Topic;
-import javax.jms.Queue;
import org.apache.camel.Component;
import org.apache.camel.Exchange;
@@ -58,35 +58,6 @@
private JmsConfiguration configuration;
private Requestor requestor;
- /**
- * Returns a new JMS endpoint for the given JMS destination using the configuration from the given JMS component
- */
- public static JmsEndpoint newInstance(Destination destination, JmsComponent component) throws JMSException {
- JmsEndpoint answer = newInstance(destination);
- JmsConfiguration newConfiguration = component.getConfiguration().copy();
- answer.setConfiguration(newConfiguration);
- answer.setCamelContext(component.getCamelContext());
- return answer;
- }
-
- /**
- * Returns a new JMS endpoint for the given JMS destination
- */
- public static JmsEndpoint newInstance(Destination destination) throws JMSException {
- if (destination instanceof TemporaryQueue) {
- return new JmsTemporaryQueueEndpoint((TemporaryQueue) destination);
- }
- if (destination instanceof TemporaryTopic) {
- return new JmsTemporaryTopicEndpoint((TemporaryTopic) destination);
- }
- if (destination instanceof Queue) {
- return new JmsQueueEndpoint((Queue) destination);
- }
- else {
- return new JmsEndpoint((Topic) destination);
- }
- }
-
public JmsEndpoint() {
this(null, null);
}
@@ -122,6 +93,35 @@
this(endpointUri, destinationName, true);
}
+
+ /**
+ * Returns a new JMS endpoint for the given JMS destination using the configuration from the given JMS component
+ */
+ public static JmsEndpoint newInstance(Destination destination, JmsComponent component) throws JMSException {
+ JmsEndpoint answer = newInstance(destination);
+ JmsConfiguration newConfiguration = component.getConfiguration().copy();
+ answer.setConfiguration(newConfiguration);
+ answer.setCamelContext(component.getCamelContext());
+ return answer;
+ }
+
+ /**
+ * Returns a new JMS endpoint for the given JMS destination
+ */
+ public static JmsEndpoint newInstance(Destination destination) throws JMSException {
+ if (destination instanceof TemporaryQueue) {
+ return new JmsTemporaryQueueEndpoint((TemporaryQueue) destination);
+ }
+ if (destination instanceof TemporaryTopic) {
+ return new JmsTemporaryTopicEndpoint((TemporaryTopic) destination);
+ }
+ if (destination instanceof Queue) {
+ return new JmsQueueEndpoint((Queue) destination);
+ } else {
+ return new JmsEndpoint((Topic) destination);
+ }
+ }
+
public JmsProducer createProducer() throws Exception {
return new JmsProducer(this);
}
@@ -139,6 +139,7 @@
} else if (destination != null) {
jmsTemplate.setDefaultDestination(destination);
}
+ // TODO: Why is this destination resolver disabled for producer? Its enable for consumer!
/*
else {
DestinationResolver resolver = getDestinationResolver();
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=748503&r1=748502&r2=748503&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Fri Feb 27 13:11:55 2009
@@ -58,7 +58,6 @@
private Requestor requestor;
private AtomicBoolean started = new AtomicBoolean(false);
-
private enum RequestorAffinity {
PER_COMPONENT(0),
PER_ENDPOINT(1),
@@ -67,7 +66,7 @@
private RequestorAffinity(int value) {
this.value = value;
}
- };
+ }
public JmsProducer(JmsEndpoint endpoint) {
super(endpoint);
@@ -75,9 +74,9 @@
JmsConfiguration c = endpoint.getConfiguration();
affinity = RequestorAffinity.PER_PRODUCER;
if (c.getReplyTo() != null) {
- if (c.getReplyToTempDestinationAffinity().equals(c.REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT)) {
+ if (c.getReplyToTempDestinationAffinity().equals(JmsConfiguration.REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT)) {
affinity = RequestorAffinity.PER_ENDPOINT;
- } else if (c.getReplyToTempDestinationAffinity().equals(c.REPLYTO_TEMP_DEST_AFFINITY_PER_COMPONENT)) {
+ } else if (c.getReplyToTempDestinationAffinity().equals(JmsConfiguration.REPLYTO_TEMP_DEST_AFFINITY_PER_COMPONENT)) {
affinity = RequestorAffinity.PER_COMPONENT;
}
}
@@ -197,9 +196,7 @@
if (destinationName != null) {
template.send(destinationName, messageCreator, callback);
} else if (destination != null) {
- // TODO cannot pass in callback using destination?
- template.send(destination.toString(), messageCreator, callback);
- // template.send(destination, messageCreator);
+ template.send(destination, messageCreator, callback);
} else {
throw new IllegalArgumentException("Neither destination nor destinationName is specified on this endpoint: " + endpoint);
}
@@ -271,8 +268,7 @@
out.setMessageId(out.getJmsMessage().getJMSMessageID());
}
} catch (JMSException e) {
- LOG.warn("Unable to retrieve JMSMessageID from outgoing JMS Message and "
- + "set it into Camel's MessageId", e);
+ LOG.warn("Unable to retrieve JMSMessageID from outgoing JMS Message and set it into Camel's MessageId", e);
}
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=748503&r1=748502&r2=748503&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java Fri Feb 27 13:11:55 2009
@@ -18,6 +18,8 @@
import java.util.Collections;
import java.util.List;
+import javax.jms.JMSException;
+import javax.jms.Queue;
import org.apache.camel.Exchange;
import org.apache.camel.spi.BrowsableEndpoint;
@@ -25,10 +27,6 @@
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsOperations;
-import javax.jms.Topic;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-
/**
* An endpoint for a JMS Queue which is also browsable
*
Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestCustomReplyToTest.java (from r748488, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestCustomReplyToTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestCustomReplyToTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java&r1=748488&r2=748503&rev=748503&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestCustomReplyToTest.java Fri Feb 27 13:11:55 2009
@@ -20,7 +20,9 @@
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
@@ -30,19 +32,16 @@
import org.apache.commons.logging.LogFactory;
/**
- * A simple requesr / late reply test using InOptionalOut.
+ * A simple request/reply using custom reply to header.
*/
-public class JmsSimpleRequestLateReplyTest extends ContextTestSupport {
-
- private static final Log LOG = LogFactory.getLog(JmsSimpleRequestLateReplyTest.class);
+public class JmsSimpleRequestCustomReplyToTest extends ContextTestSupport {
+ private static final Log LOG = LogFactory.getLog(JmsSimpleRequestCustomReplyToTest.class);
+ private static String myReplyTo;
protected String componentName = "activemq";
+ private CountDownLatch latch = new CountDownLatch(1);
- private final CountDownLatch latch = new CountDownLatch(1);
- private static String replyDestination;
- private static String cid;
-
- public void testRequetLateReply() throws Exception {
+ public void testRequetCustomReplyTo() throws Exception {
// use another thread to send the late reply to simulate that we do it later, not
// from the origianl route anyway
Thread sender = new Thread(new SendLateReply());
@@ -53,17 +52,31 @@
Exchange out = template.request("activemq:queue:hello", new Processor() {
public void process(Exchange exchange) throws Exception {
- // we expect a response so InOut
- exchange.setPattern(ExchangePattern.InOut);
+ exchange.setPattern(ExchangePattern.InOnly);
+ exchange.getIn().setHeader("MyReplyQeueue", "foo");
exchange.getIn().setBody("Hello World");
}
});
result.assertIsSatisfied();
-
assertNotNull(out);
- // TODO: We should get this late reply to work
- //assertEquals("Late Reply", out.getOut().getBody());
+ assertNull(out.getOut(false));
+
+ // get the reply from the special reply queue
+ Endpoint end = context.getEndpoint(componentName + ":" + myReplyTo);
+ final Consumer consumer = end.createConsumer(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ assertEquals("Late reply", exchange.getIn().getBody());
+ latch.countDown();
+
+ }
+ });
+ // reset latch
+ latch = new CountDownLatch(1);
+ consumer.start();
+
+ latch.await();
+ consumer.stop();
}
private class SendLateReply implements Runnable {
@@ -80,11 +93,10 @@
}
LOG.debug("Sending late reply");
- template.send(componentName + ":" + replyDestination, new Processor() {
+ template.send(componentName + ":" + myReplyTo, new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.setPattern(ExchangePattern.InOnly);
exchange.getIn().setBody("Late reply");
- exchange.getIn().setHeader("JMSCorrelationID", cid);
}
});
}
@@ -104,16 +116,12 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- // set the MEP to InOptionalOut as we might not be able to send a reply
- from(componentName + ":queue:hello").setExchangePattern(ExchangePattern.InOptionalOut).process(new Processor() {
+ from(componentName + ":queue:hello").process(new Processor() {
public void process(Exchange exchange) throws Exception {
assertEquals("Hello World", exchange.getIn().getBody());
- replyDestination = exchange.getProperty(JmsConstants.JMS_REPLY_DESTINATION, String.class);
- cid = exchange.getIn().getHeader("JMSCorrelationID", String.class);
-
- LOG.debug("ReplyDestination: " + replyDestination);
- LOG.debug("JMSCorrelationID: " + cid);
+ myReplyTo = exchange.getIn().getHeader("MyReplyQeueue", String.class);
+ LOG.debug("ReplyTo: " + myReplyTo);
LOG.debug("Ahh I cannot send a reply. Someone else must do it.");
latch.countDown();
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java?rev=748503&r1=748502&r2=748503&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsSimpleRequestLateReplyTest.java Fri Feb 27 13:11:55 2009
@@ -17,46 +17,38 @@
package org.apache.camel.component.jms;
import java.util.concurrent.CountDownLatch;
+import javax.jms.Destination;
+import javax.jms.JMSException;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
import org.apache.camel.Message;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.TemporaryQueue;
-
/**
* A simple requesr / late reply test using InOptionalOut.
*/
public class JmsSimpleRequestLateReplyTest extends ContextTestSupport {
private static final Log LOG = LogFactory.getLog(JmsSimpleRequestLateReplyTest.class);
-
- protected String expectedBody = "Late Reply";
-
- private final CountDownLatch latch = new CountDownLatch(1);
private static Destination replyDestination;
private static String cid;
+ protected String expectedBody = "Late Reply";
protected ActiveMQComponent activeMQComponent;
+ private final CountDownLatch latch = new CountDownLatch(1);
public void testRequestLateReplyUsingCustomDestinationHeaderForReply() throws Exception {
- Runnable runnable = new SendLateReply();
- doTest(runnable);
-
+ doTest(new SendLateReply());
}
public void testRequestLateReplyUsingDestinationEndpointForReply() throws Exception {
- // use another thread to send the late reply to simulate that we do it later, not
- // from the origianl route anyway
doTest(new SendLateReplyUsingTemporaryEndpoint());
}
@@ -97,6 +89,7 @@
}
LOG.debug("Sending late reply");
+ // use some dummt queue as we override this with the property: JmsConstants.JMS_DESTINATION
template.send("activemq:dummy", new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.setPattern(ExchangePattern.InOnly);