You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/05/03 10:31:49 UTC
svn commit: r653015 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/
components/camel-jms/src/main/java/org/apache/camel/component/jms/
components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/
components/camel-...
Author: ningjiang
Date: Sat May 3 01:31:48 2008
New Revision: 653015
URL: http://svn.apache.org/viewvc?rev=653015&view=rev
Log:
CAMEL-490 applied the patch with thanks to Marat
Added:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java (with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java (with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java (with props)
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java (with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/AbstractTransactionTest.xml
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java Sat May 3 01:31:48 2008
@@ -440,6 +440,17 @@
this.useEndpointCache = useEndpointCache;
}
+ public <T extends Endpoint<?>> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
+ Endpoint<?> e = null;
+ synchronized (endpointCache) {
+ e = endpointCache.get(endpointUri);
+ }
+ if (e != null && expectedClass.isAssignableFrom(e.getClass())) {
+ return expectedClass.asSubclass(expectedClass).cast(e);
+ }
+ return null;
+ }
+
// Implementation methods
// -----------------------------------------------------------------------
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Sat May 3 01:31:48 2008
@@ -35,6 +35,8 @@
* A JMS {@link MessageListener} which can be used to delegate processing to a
* Camel endpoint.
*
+ * Note that instance of this object has to be thread safe (reentrant)
+ *
* @version $Revision$ ;';;;
*/
public class EndpointMessageListener implements MessageListener {
@@ -111,7 +113,7 @@
this.eagerLoadingOfProperties = eagerLoadingOfProperties;
}
- public JmsOperations getTemplate() {
+ public synchronized JmsOperations getTemplate() {
if (template == null) {
template = endpoint.createInOnlyTemplate();
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Sat May 3 01:31:48 2008
@@ -128,7 +128,7 @@
boolean alwaysCopy = (endpoint != null) ? endpoint.getConfiguration().isAlwaysCopyMessage() : false;
if (!alwaysCopy && camelMessage instanceof JmsMessage) {
JmsMessage jmsMessage = (JmsMessage)camelMessage;
- if (! jmsMessage.shouldCreateNewMessage()) {
+ if (!jmsMessage.shouldCreateNewMessage()) {
answer = jmsMessage.getJmsMessage();
}
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Sat May 3 01:31:48 2008
@@ -49,11 +49,6 @@
*/
public class JmsComponent extends DefaultComponent<JmsExchange> implements ApplicationContextAware {
- public static final String QUEUE_PREFIX = "queue:";
- public static final String TOPIC_PREFIX = "topic:";
- public static final String TEMP_QUEUE_PREFIX = "temp:queue:";
- public static final String TEMP_TOPIC_PREFIX = "temp:topic:";
-
private static final transient Log LOG = LogFactory.getLog(JmsComponent.class);
private static final String DEFAULT_QUEUE_BROWSE_STRATEGY = "org.apache.camel.component.jms.DefaultQueueBrowseStrategy";
private JmsConfiguration configuration;
@@ -365,20 +360,20 @@
boolean pubSubDomain = false;
boolean tempDestination = false;
- if (remaining.startsWith(QUEUE_PREFIX)) {
+ if (remaining.startsWith(JmsConfiguration.QUEUE_PREFIX)) {
pubSubDomain = false;
- remaining = removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/');
- } else if (remaining.startsWith(TOPIC_PREFIX)) {
+ remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.QUEUE_PREFIX.length()), '/');
+ } else if (remaining.startsWith(JmsConfiguration.TOPIC_PREFIX)) {
pubSubDomain = true;
- remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
- } else if (remaining.startsWith(TEMP_QUEUE_PREFIX)) {
+ remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TOPIC_PREFIX.length()), '/');
+ } else if (remaining.startsWith(JmsConfiguration.TEMP_QUEUE_PREFIX)) {
pubSubDomain = false;
tempDestination = true;
- remaining = removeStartingCharacters(remaining.substring(TEMP_QUEUE_PREFIX.length()), '/');
- } else if (remaining.startsWith(TEMP_TOPIC_PREFIX)) {
+ remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_QUEUE_PREFIX.length()), '/');
+ } else if (remaining.startsWith(JmsConfiguration.TEMP_TOPIC_PREFIX)) {
pubSubDomain = true;
tempDestination = true;
- remaining = removeStartingCharacters(remaining.substring(TEMP_TOPIC_PREFIX.length()), '/');
+ remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()), '/');
}
final String subject = convertPathToActualDestination(remaining, parameters);
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Sat May 3 01:31:48 2008
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.jms;
+import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
+
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
@@ -57,10 +59,20 @@
* @version $Revision$
*/
public class JmsConfiguration implements Cloneable {
+
+ public static final String QUEUE_PREFIX = "queue:";
+ public static final String TOPIC_PREFIX = "topic:";
+ public static final String TEMP_QUEUE_PREFIX = "temp:queue:";
+ public static final String TEMP_TOPIC_PREFIX = "temp:topic:";
+
protected static final String TRANSACTED = "TRANSACTED";
protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
protected static final String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE";
+ protected static final String REPLYTO_TEMP_DEST_AFFINITY_PER_COMPONENT = "component";
+ protected static final String REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT = "endpoint";
+ protected static final String REPLYTO_TEMP_DEST_AFFINITY_PER_PRODUCER = "producer";
+
private static final transient Log LOG = LogFactory.getLog(JmsConfiguration.class);
private JmsOperations jmsOperations;
private DestinationResolver destinationResolver;
@@ -87,6 +99,7 @@
private String cacheLevelName;
private long recoveryInterval = -1;
private long receiveTimeout = -1;
+ private long requestTimeout = 20000L;
private int idleTaskExecutionLimit = 1;
private int maxConcurrentConsumers = 1;
// JmsTemplate only
@@ -113,6 +126,14 @@
private boolean useMessageIDAsCorrelationID;
private JmsProviderMetadata providerMetadata = new JmsProviderMetadata();
private JmsOperations metadataJmsOperations;
+ // defines the component created temporary replyTo destination sharing strategy:
+ // possible values are: "component", "endpoint", "producer"
+ // component - a single temp queue is shared among all producers for a given component instance
+ // endpoint - a single temp queue is shared among all producers for a given endpoint instance
+ // producer - a single temp queue is created per producer
+ private String replyToTempDestinationAffinity = REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT;
+ private String replyToDestination;
+ private String replyToDestinationSelectorName;
public JmsConfiguration() {
}
@@ -846,6 +867,10 @@
}
}
+ if (endpoint.getSelector() != null && endpoint.getSelector().length() != 0) {
+ container.setMessageSelector(endpoint.getSelector());
+ }
+
if (container instanceof DefaultMessageListenerContainer) {
// this includes DefaultMessageListenerContainer102
DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer)container;
@@ -932,7 +957,8 @@
template.setDeliveryPersistent(isReplyToDeliveryPersistent());
}
}
- protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
+
+ public AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
// TODO we could allow a spring container to auto-inject these objects?
switch (consumerType) {
case Simple:
@@ -1038,4 +1064,48 @@
public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
this.useMessageIDAsCorrelationID = useMessageIDAsCorrelationID;
}
+
+ public String getReplyToTempDestinationAffinity() {
+ return replyToTempDestinationAffinity;
+ }
+
+ public void setReplyToTempDestinationAffinity(
+ String replyToTempDestinationAffinity) {
+ this.replyToTempDestinationAffinity = replyToTempDestinationAffinity;
+ }
+
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public void setRequestTimeout(long requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
+ public String getReplyTo() {
+ return replyToDestination;
+ }
+
+ public void setReplyTo(String replyToDestination) {
+ if (!replyToDestination.startsWith(QUEUE_PREFIX)) {
+ throw new IllegalArgumentException("ReplyTo destination value has to be of type queue; "
+ + "e.g: \"queue:replyQueue\"");
+ }
+ this.replyToDestination =
+ removeStartingCharacters(replyToDestination.substring(QUEUE_PREFIX.length()), '/');
+ }
+
+ public String getReplyToDestinationSelectorName() {
+ return replyToDestinationSelectorName;
+ }
+
+ public void setReplyToDestinationSelectorName(String replyToDestinationSelectorName) {
+ this.replyToDestinationSelectorName = replyToDestinationSelectorName;
+ // in case of consumer -> producer and a named replyTo correlation selector
+ // message passthough is impossible as we need to set the value of selector into
+ // outgoing message, which would be read-only if passthough were to remain enabled
+ if (replyToDestinationSelectorName != null) {
+ setAlwaysCopyMessage(true);
+ }
+ }
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Sat May 3 01:31:48 2008
@@ -42,7 +42,7 @@
private String selector;
private JmsConfiguration configuration;
private Requestor requestor;
- private long requestTimeout = 20000L;
+ private long requestTimeout;
public JmsEndpoint(String uri, JmsComponent component, String destination, boolean pubSubDomain, JmsConfiguration configuration) {
super(uri, component);
@@ -50,6 +50,7 @@
this.configuration = configuration;
this.destination = destination;
this.pubSubDomain = pubSubDomain;
+ this.requestTimeout = configuration.getRequestTimeout();
}
public JmsProducer createProducer() throws Exception {
@@ -86,9 +87,6 @@
public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception {
listenerContainer.setDestinationName(destination);
listenerContainer.setPubSubDomain(pubSubDomain);
- if (selector != null) {
- listenerContainer.setMessageSelector(selector);
- }
return new JmsConsumer(this, processor, listenerContainer);
}
@@ -165,7 +163,8 @@
public synchronized Requestor getRequestor() throws Exception {
if (requestor == null) {
- requestor = component.getRequestor();
+ requestor = new Requestor(getConfiguration(), getExecutorService());
+ requestor.start();
}
return requestor;
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java Sat May 3 01:31:48 2008
@@ -107,6 +107,11 @@
}
public void setJmsMessage(Message jmsMessage) {
+ try {
+ setMessageId(jmsMessage.getJMSMessageID());
+ } catch (JMSException e) {
+ LOG.warn("Unable to retrieve JMSMessageID from JMS Message", e);
+ }
this.jmsMessage = jmsMessage;
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Sat May 3 01:31:48 2008
@@ -19,6 +19,7 @@
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -27,9 +28,12 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap;
+import org.apache.camel.component.jms.requestor.PersistentReplyToRequestor;
import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
import org.apache.camel.component.jms.requestor.Requestor;
import org.apache.camel.impl.DefaultProducer;
@@ -50,10 +54,32 @@
private JmsOperations inOutTemplate;
private UuidGenerator uuidGenerator;
private DeferredRequestReplyMap deferredRequestReplyMap;
+ private Requestor requestor;
+ RequestorAffinity affinity;
+ private AtomicBoolean started = new AtomicBoolean(false);
+
+ private enum RequestorAffinity {
+ PER_COMPONENT(0),
+ PER_ENDPOINT(1),
+ PER_PRODUCER(2);
+ private int value;
+ private RequestorAffinity(int value) {
+ this.value = value;
+ }
+ };
public JmsProducer(JmsEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
+ JmsConfiguration c = endpoint.getConfiguration();
+ affinity = RequestorAffinity.PER_PRODUCER;
+ if (c.getReplyTo() != null) {
+ if (c.getReplyToTempDestinationAffinity().equals(c.REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT)) {
+ affinity = RequestorAffinity.PER_ENDPOINT;
+ } else if (c.getReplyToTempDestinationAffinity().equals(c.REPLYTO_TEMP_DEST_AFFINITY_PER_COMPONENT)) {
+ affinity = RequestorAffinity.PER_COMPONENT;
+ }
+ }
}
public long getRequestTimeout() {
@@ -62,12 +88,57 @@
protected void doStart() throws Exception {
super.doStart();
- deferredRequestReplyMap = endpoint.getRequestor().getDeferredRequestReplyMap(this);
}
+ protected void testAndSetRequestor() throws RuntimeCamelException {
+ if (started.get() == false) {
+ synchronized (this) {
+ if (started.get() == true) {
+ return;
+ }
+ try {
+ JmsConfiguration c = endpoint.getConfiguration();
+ if (c.getReplyTo() != null) {
+ requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(),
+ endpoint.getExecutorService());
+ requestor.start();
+ } else {
+ if (affinity == RequestorAffinity.PER_PRODUCER) {
+ requestor = new Requestor(endpoint.getConfiguration(),
+ endpoint.getExecutorService());
+ requestor.start();
+ } else if (affinity == RequestorAffinity.PER_ENDPOINT) {
+ requestor = endpoint.getRequestor();
+ } else if (affinity == RequestorAffinity.PER_COMPONENT) {
+ requestor = ((JmsComponent)endpoint.getComponent()).getRequestor();
+ }
+ }
+ } catch (Exception e) {
+ throw new FailedToCreateProducerException(endpoint, e);
+ }
+ deferredRequestReplyMap = requestor.getDeferredRequestReplyMap(this);
+ started.set(true);
+ }
+ }
+ }
+
+ protected void testAndUnsetRequestor() throws Exception {
+ if (started.get() == true) {
+ synchronized (this) {
+ if (started.get() == false) {
+ return;
+ }
+ requestor.removeDeferredRequestReplyMap(this);
+ if (affinity == RequestorAffinity.PER_PRODUCER) {
+ requestor.stop();
+ }
+ started.set(false);
+ }
+ }
+ }
+
protected void doStop() throws Exception {
- endpoint.getRequestor().removeDeferredRequestReplyMap(this);
- deferredRequestReplyMap = null;
+ testAndUnsetRequestor();
super.doStop();
}
@@ -75,21 +146,20 @@
final org.apache.camel.Message in = exchange.getIn();
if (exchange.getPattern().isOutCapable()) {
- // create a temporary queue and consumer for responses...
+
+ testAndSetRequestor();
+
// note due to JMS transaction semantics we cannot use a single transaction
// for sending the request and receiving the response
- final Requestor requestor;
- try {
- requestor = endpoint.getRequestor();
- } catch (Exception e) {
- throw new RuntimeExchangeException(e, exchange);
- }
-
final Destination replyTo = requestor.getReplyTo();
-
+
+ if (replyTo == null) {
+ throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
+ }
+
final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
String correlationId = in.getHeader("JMSCorrelationID", String.class);
-
+
if (correlationId == null && !msgIdAsCorrId) {
in.setHeader("JMSCorrelationID", getUuidGenerator().generateId());
}
@@ -102,6 +172,7 @@
public Message createMessage(Session session) throws JMSException {
Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
message.setJMSReplyTo(replyTo);
+ requestor.setReplyToSelectorHeader(in, message);
FutureTask future = null;
future = (!msgIdAsCorrId)
@@ -116,6 +187,8 @@
return message;
}
}, callback);
+
+ setMessageId(exchange);
// lets wait and return the response
long requestTimeout = endpoint.getRequestTimeout();
@@ -159,9 +232,27 @@
return message;
}
});
+
+ setMessageId(exchange);
}
}
+ protected void setMessageId(Exchange exchange) {
+ if (!(exchange instanceof JmsExchange)) {
+ return;
+ }
+ try {
+ JmsExchange jmsExchange = JmsExchange.class.cast(exchange);
+ JmsMessage out = jmsExchange.getOut(false);
+ if (out != null) {
+ out.setMessageId(out.getJmsMessage().getJMSMessageID());
+ }
+ } catch (JMSException e) {
+ LOG.warn("Unable to retrieve JMSMessageID from outgoing JMS Message and "
+ + "set it into Camel's MessageId", e);
+ }
+ }
+
/**
* Preserved for backwards compatibility.
*
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java Sat May 3 01:31:48 2008
@@ -38,6 +38,7 @@
public static class DeferredMessageSentCallback implements MessageSentCallback {
private DeferredRequestReplyMap map;
private String transitionalID;
+ private Message message;
private Object monitor;
public DeferredMessageSentCallback(DeferredRequestReplyMap map, UuidGenerator uuidGenerator, Object monitor) {
@@ -54,7 +55,12 @@
return transitionalID;
}
+ public Message getMessage() {
+ return message;
+ }
+
public void sent(Message message) {
+ this.message = message;
map.processDeferredReplies(monitor, getID(), message);
}
}
@@ -124,6 +130,7 @@
}
deferredRequestMap.remove(transitionalID);
String correlationID = outMessage.getJMSMessageID();
+ // System.out.println("DeferredRequestReplyMap.processDeferredReplies: sent messageID = " + correlationID);
Object in = deferredReplyMap.get(correlationID);
if (in != null && in instanceof Message) {
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java Sat May 3 01:31:48 2008
@@ -28,9 +28,10 @@
*
* @version $Revision$
*/
-public class FutureHandler extends FutureTask implements ReplyHandler {
- private static final Callable EMPTY_CALLABLE = new Callable() {
- public Object call() throws Exception {
+public class FutureHandler extends FutureTask<Message> implements ReplyHandler {
+
+ private static final Callable<Message> EMPTY_CALLABLE = new Callable<Message>() {
+ public Message call() throws Exception {
return null;
}
};
@@ -39,7 +40,7 @@
super(EMPTY_CALLABLE);
}
- public synchronized void set(Object result) {
+ public synchronized void set(Message result) {
super.set(result);
}
Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java?rev=653015&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java Sat May 3 01:31:48 2008
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MessageSelectorProvider {
+ protected Map<String, String> correlationIds;
+ protected boolean dirty = true;
+ protected StringBuilder expression;
+
+ public MessageSelectorProvider() {
+ correlationIds = new HashMap<String, String>();
+ }
+
+ public synchronized void addCorrelationID(String id) {
+ correlationIds.put(id, id);
+ dirty = true;
+ }
+
+ public synchronized void removeCorrelationID(String id) {
+ correlationIds.remove(id);
+ dirty = true;
+ }
+
+ public synchronized String get() {
+ if (!dirty) {
+ return expression.toString();
+ }
+ expression = new StringBuilder("JMSCorrelationID='");
+ boolean first = true;
+ for (Map.Entry<String, String> entry : correlationIds.entrySet()) {
+ if (!first) {
+ expression.append(" OR JMSCorrelationID='");
+ }
+ expression.append(entry.getValue()).append("'");
+ if (first) {
+ first = false;
+ }
+ }
+ dirty = false;
+ return expression.toString();
+ }
+}
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java?rev=653015&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java Sat May 3 01:31:48 2008
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.camel.component.jms.JmsProducer;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
+import org.apache.camel.component.jms.requestor.PersistentReplyToRequestor.MessageSelectorComposer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.transaction.PlatformTransactionManager;
+
+public class PersistentReplyToFutureHandler extends FutureHandler {
+
+ private static final transient Log LOG = LogFactory.getLog(PersistentReplyToFutureHandler.class);
+ protected PersistentReplyToRequestor requestor;
+ protected DeferredMessageSentCallback callback;
+ protected String correlationID;
+
+ public PersistentReplyToFutureHandler(PersistentReplyToRequestor requestor,
+ String correlationID) {
+ super();
+ this.requestor = requestor;
+ this.correlationID = correlationID;
+ }
+
+ public PersistentReplyToFutureHandler(PersistentReplyToRequestor requestor,
+ DeferredMessageSentCallback callback) {
+ super();
+ this.requestor = requestor;
+ this.callback = callback;
+ }
+
+ @Override
+ public Message get() throws InterruptedException, ExecutionException {
+ Message result = null;
+ try {
+ updateSelector();
+ result = super.get();
+ } finally {
+ revertSelector();
+ }
+ return result;
+ }
+
+ @Override
+ public Message get(long timeout, TimeUnit unit) throws InterruptedException,
+ ExecutionException,
+ TimeoutException {
+ Message result = null;
+ try {
+ updateSelector();
+ result = super.get(timeout, unit);
+ } finally {
+ revertSelector();
+ }
+ return result;
+ }
+
+ protected void updateSelector() throws ExecutionException {
+ try {
+ MessageSelectorComposer composer = (MessageSelectorComposer)requestor.getListenerContainer();
+ composer.addCorrelationID((correlationID != null) ? correlationID : callback.getMessage().getJMSMessageID());
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+
+ protected void revertSelector() throws ExecutionException {
+ try {
+ MessageSelectorComposer composer = (MessageSelectorComposer)requestor.getListenerContainer();
+ composer.removeCorrelationID((correlationID != null) ? correlationID : callback.getMessage().getJMSMessageID());
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+}
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java?rev=653015&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java Sat May 3 01:31:48 2008
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import java.math.BigInteger;
+import java.util.Random;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer102;
+import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.transaction.PlatformTransactionManager;
+
+public class PersistentReplyToRequestor extends Requestor {
+ private String replyToSelectorValue;
+
+ public class DestinationResolverDelegate implements DestinationResolver {
+ private DestinationResolver delegate;
+ private Destination destination;
+
+ public DestinationResolverDelegate(DestinationResolver delegate) {
+ this.delegate = delegate;
+ }
+
+ public Destination resolveDestinationName(Session session, String destinationName,
+ boolean pubSubDomain) throws JMSException {
+ synchronized (getOutterInstance()) {
+ try {
+ if (destination == null) {
+ destination = delegate.resolveDestinationName(session, destinationName, pubSubDomain);
+ setReplyTo(destination);
+ }
+ } finally {
+ getOutterInstance().notifyAll();
+ }
+ }
+ return destination;
+ }
+ };
+
+ public static interface MessageSelectorComposer {
+ void addCorrelationID(String id);
+ void removeCorrelationID(String id);
+ }
+
+ public static class CamelDefaultMessageListenerContainer102 extends DefaultMessageListenerContainer102
+ implements MessageSelectorComposer {
+ MessageSelectorProvider provider = new MessageSelectorProvider();
+
+ public void addCorrelationID(String id) {
+ provider.addCorrelationID(id);
+ }
+
+ public void removeCorrelationID(String id) {
+ provider.removeCorrelationID(id);
+ }
+
+ @Override
+ public void setMessageSelector(String messageSelector) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getMessageSelector() {
+ return provider.get();
+ }
+ }
+
+ public static class CamelDefaultMessageListenerContainer extends DefaultMessageListenerContainer
+ implements MessageSelectorComposer {
+
+ MessageSelectorProvider provider = new MessageSelectorProvider();
+
+ public void addCorrelationID(String id) {
+ provider.addCorrelationID(id);
+ }
+
+ public void removeCorrelationID(String id) {
+ provider.removeCorrelationID(id);
+ }
+
+ @Override
+ public void setMessageSelector(String messageSelector) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getMessageSelector() {
+ return provider.get();
+ }
+ }
+
+ public PersistentReplyToRequestor(JmsConfiguration configuration,
+ ScheduledExecutorService executorService) {
+ super(configuration, executorService);
+ }
+
+
+ @Override
+ protected FutureHandler createFutureHandler(String correlationID) {
+ boolean dynamicSelector = (getConfiguration().getReplyToDestinationSelectorName() == null);
+ if (dynamicSelector) {
+ return new PersistentReplyToFutureHandler(this, correlationID);
+ }
+ return new FutureHandler();
+ }
+
+ @Override
+ protected FutureHandler createFutureHandler(DeferredMessageSentCallback callback) {
+ boolean dynamicSelector = (getConfiguration().getReplyToDestinationSelectorName() == null);
+ if (dynamicSelector) {
+ return new PersistentReplyToFutureHandler(this, callback);
+ }
+ return new FutureHandler();
+ }
+
+ @Override
+ public AbstractMessageListenerContainer createListenerContainer() {
+ JmsConfiguration config = getConfiguration();
+ String replyToSelectorName = getConfiguration().getReplyToDestinationSelectorName();
+
+ AbstractMessageListenerContainer container =
+ config.isUseVersion102() ?
+ (replyToSelectorName != null) ? new DefaultMessageListenerContainer102()
+ : new CamelDefaultMessageListenerContainer102()
+ : (replyToSelectorName != null) ? new DefaultMessageListenerContainer()
+ : new CamelDefaultMessageListenerContainer();
+
+ container.setConnectionFactory(config.getListenerConnectionFactory());
+
+ DestinationResolver resolver = config.getDestinationResolver();
+ if (resolver == null) {
+ resolver = container.getDestinationResolver();
+ }
+
+ container.setDestinationResolver(new DestinationResolverDelegate(resolver));
+ container.setDestinationName(getConfiguration().getReplyTo());
+
+ if (replyToSelectorName != null) {
+ replyToSelectorValue = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
+ container.setMessageSelector(replyToSelectorName + "='" + replyToSelectorValue + "'");
+ } else {
+ ((MessageSelectorComposer)container).addCorrelationID("ID:" + new BigInteger(24 * 8, new Random()).toString(16));
+ }
+
+ container.setAutoStartup(true);
+ container.setMessageListener(this);
+ container.setPubSubDomain(false);
+ container.setSubscriptionDurable(false);
+ ExceptionListener exceptionListener = config.getExceptionListener();
+ if (exceptionListener != null) {
+ container.setExceptionListener(exceptionListener);
+ }
+ container.setSessionTransacted(config.isTransacted());
+ if (config.isTransacted()) {
+ container.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
+ } else {
+ if (config.getAcknowledgementMode() >= 0) {
+ container.setSessionAcknowledgeMode(config.getAcknowledgementMode());
+ } else if (config.getAcknowledgementModeName() != null) {
+ container.setSessionAcknowledgeModeName(config.getAcknowledgementModeName());
+ }
+ }
+ if (container instanceof DefaultMessageListenerContainer) {
+ DefaultMessageListenerContainer defContainer = (DefaultMessageListenerContainer)container;
+ defContainer.setConcurrentConsumers(1);
+ defContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
+
+ if (config.getReceiveTimeout() >= 0) {
+ defContainer.setReceiveTimeout(config.getReceiveTimeout());
+ }
+ if (config.getRecoveryInterval() >= 0) {
+ defContainer.setRecoveryInterval(config.getRecoveryInterval());
+ }
+ TaskExecutor taskExecutor = config.getTaskExecutor();
+ if (taskExecutor != null) {
+ defContainer.setTaskExecutor(taskExecutor);
+ }
+ PlatformTransactionManager tm = config.getTransactionManager();
+ if (tm != null) {
+ defContainer.setTransactionManager(tm);
+ } else if (config.isTransacted()) {
+ throw new IllegalArgumentException("Property transacted is enabled but a transactionManager was not injected!");
+ }
+ if (config.getTransactionName() != null) {
+ defContainer.setTransactionName(config.getTransactionName());
+ }
+ if (config.getTransactionTimeout() >= 0) {
+ defContainer.setTransactionTimeout(config.getTransactionTimeout());
+ }
+ }
+ return container;
+ }
+
+ @Override
+ public void setReplyToSelectorHeader(org.apache.camel.Message in, Message jmsIn) throws JMSException {
+ String replyToSelectorName = getConfiguration().getReplyToDestinationSelectorName();
+ if (replyToSelectorValue != null) {
+ in.setHeader(replyToSelectorName, replyToSelectorValue);
+ jmsIn.setStringProperty(replyToSelectorName, replyToSelectorValue);
+ }
+ }
+}
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java Sat May 3 01:31:48 2008
@@ -56,9 +56,10 @@
private TimeoutMap requestMap;
private Map<JmsProducer, DeferredRequestReplyMap> producerDeferredRequestReplyMap;
private TimeoutMap deferredRequestMap;
- private TimeoutMap deferredReplyMap;
+ private TimeoutMap deferredReplyMap;
private Destination replyTo;
private long maxRequestTimeout = -1;
+ private long replyToResolverTimeout = 5000;
public Requestor(JmsConfiguration configuration, ScheduledExecutorService executorService) {
@@ -75,17 +76,21 @@
if (map == null) {
map = new DeferredRequestReplyMap(this, producer, deferredRequestMap, deferredReplyMap);
producerDeferredRequestReplyMap.put(producer, map);
- }
- if (maxRequestTimeout == -1) {
- maxRequestTimeout = producer.getRequestTimeout();
- } else if (maxRequestTimeout < producer.getRequestTimeout()) {
- maxRequestTimeout = producer.getRequestTimeout();
+ if (maxRequestTimeout == -1) {
+ maxRequestTimeout = producer.getRequestTimeout();
+ } else if (maxRequestTimeout < producer.getRequestTimeout()) {
+ maxRequestTimeout = producer.getRequestTimeout();
+ }
}
return map;
}
public synchronized void removeDeferredRequestReplyMap(JmsProducer producer) {
- producerDeferredRequestReplyMap.remove(producer);
+ DeferredRequestReplyMap map = producerDeferredRequestReplyMap.remove(producer);
+ if (map == null) {
+ // already removed;
+ return;
+ }
if (maxRequestTimeout == producer.getRequestTimeout()) {
long max = -1;
for (Map.Entry<JmsProducer, DeferredRequestReplyMap> entry : producerDeferredRequestReplyMap.entrySet()) {
@@ -114,26 +119,30 @@
}
public FutureTask getReceiveFuture(String correlationID, long requestTimeout) {
- FutureTask future = null;
-
- if (future == null) {
- FutureHandler futureHandler = new FutureHandler();
- future = futureHandler;
- requestMap.put(correlationID, futureHandler, requestTimeout);
- }
+ FutureHandler future = createFutureHandler(correlationID);
+ requestMap.put(correlationID, future, requestTimeout);
return future;
}
public FutureTask getReceiveFuture(DeferredMessageSentCallback callback) {
- FutureTask future = new FutureHandler();
+ FutureHandler future = createFutureHandler(callback);
DeferredRequestReplyMap map = callback.getDeferredRequestReplyMap();
map.put(callback, future);
return future;
}
+
+ protected FutureHandler createFutureHandler(String correlationID) {
+ return new FutureHandler();
+ }
+
+ protected FutureHandler createFutureHandler(DeferredMessageSentCallback callback) {
+ return new FutureHandler();
+ }
public void onMessage(Message message) {
try {
String correlationID = message.getJMSCorrelationID();
+ // System.out.println("Requestor.onMessage: correlationID " + correlationID);
if (correlationID == null) {
LOG.warn("Ignoring message with no correlationID! " + message);
return;
@@ -169,6 +178,15 @@
}
public Destination getReplyTo() {
+ synchronized(this) {
+ try {
+ if (replyTo == null) {
+ wait(replyToResolverTimeout);
+ }
+ } catch (Throwable e) {
+ // eat it
+ }
+ }
return replyTo;
}
@@ -179,11 +197,13 @@
// Implementation methods
//-------------------------------------------------------------------------
+ @Override
protected void doStart() throws Exception {
AbstractMessageListenerContainer container = getListenerContainer();
container.afterPropertiesSet();
}
+ @Override
protected void doStop() throws Exception {
if (listenerContainer != null) {
listenerContainer.stop();
@@ -191,6 +211,10 @@
}
}
+ protected Requestor getOutterInstance() {
+ return this;
+ }
+
protected AbstractMessageListenerContainer createListenerContainer() {
SimpleMessageListenerContainer answer = configuration.isUseVersion102()
? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer();
@@ -199,8 +223,15 @@
public Destination resolveDestinationName(Session session, String destinationName,
boolean pubSubDomain) throws JMSException {
- TemporaryQueue queue = session.createTemporaryQueue();
- replyTo = queue;
+ TemporaryQueue queue = null;
+ synchronized (getOutterInstance()) {
+ try {
+ queue = session.createTemporaryQueue();
+ setReplyTo(queue);
+ } finally {
+ getOutterInstance().notifyAll();
+ }
+ }
return queue;
}
});
@@ -232,4 +263,12 @@
}
return uuidGenerator;
}
+
+ protected JmsConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setReplyToSelectorHeader(org.apache.camel.Message in, Message jmsIn) throws JMSException {
+ // complete
+ }
}
Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java Sat May 3 01:31:48 2008
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.jms;
-import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,9 +27,9 @@
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
@@ -38,15 +37,20 @@
* @version $Revision$
*/
public class JmsRouteRequestReplyTest extends ContextTestSupport {
+ protected static String REPLY_TO_DESTINATION_SELECTOR_NAME = "camelProducer";
protected static String componentName = "amq";
protected static String componentName1 = "amq1";
protected static String endpoingUriA = componentName + ":queue:test.a";
protected static String endpointUriB = componentName + ":queue:test.b";
protected static String endpointUriB1 = componentName1 + ":queue:test.b";
+ // note that the replyTo both A and B endpoints share the persistent replyTo queue,
+ // which is one more way to verify that reply listeners of A and B endpoints don't steal each other messages
+ protected static String endpoingtReplyToUriA = componentName + ":queue:test.a?replyTo=queue:test.a.reply";
+ protected static String endpoingtReplyToUriB = componentName + ":queue:test.b?replyTo=queue:test.a.reply";
protected static String request = "Hello World";
protected static String expectedReply = "Re: " + request;
protected static int maxTasks = 100;
- protected static int maxServerTasks = maxTasks / 5;
+ protected static int maxServerTasks = 1/*maxTasks / 5*/;
protected static int maxCalls = 10;
protected static AtomicBoolean inited = new AtomicBoolean(false);
protected static Map<String, ContextBuilder> contextBuilders = new HashMap<String, ContextBuilder>();
@@ -55,7 +59,7 @@
private interface ContextBuilder {
CamelContext buildContext(CamelContext context) throws Exception;
}
-
+
public static class SingleNodeDeadEndRouteBuilder extends RouteBuilder {
public void configure() throws Exception {
from(endpoingUriA).process(new Processor() {
@@ -89,6 +93,22 @@
}
};
+ public static class MultiNodeReplyToRouteBuilder extends RouteBuilder {
+ public void configure() throws Exception {
+ from(endpoingtReplyToUriA).to(endpoingtReplyToUriB);
+ from(endpointUriB).process(new Processor() {
+ public void process(Exchange e) {
+ Message in = e.getIn();
+ Message out = e.getOut(true);
+ String selectorValue = in.getHeader(REPLY_TO_DESTINATION_SELECTOR_NAME, String.class);
+ String request = in.getBody(String.class);
+ out.setHeader(REPLY_TO_DESTINATION_SELECTOR_NAME, selectorValue);
+ out.setBody(expectedReply + request.substring(request.indexOf('-')));
+ }
+ });
+ }
+ };
+
public static class MultiNodeDiffCompRouteBuilder extends RouteBuilder {
public void configure() throws Exception {
from(endpoingUriA).to(endpointUriB1);
@@ -100,34 +120,88 @@
});
}
};
+
+ public static class ContextBuilderMessageID implements ContextBuilder {
+ public CamelContext buildContext(CamelContext context) throws Exception {
+ ConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
+ jmsComponent.setUseMessageIDAsCorrelationID(true);
+ jmsComponent.setConcurrentConsumers(maxServerTasks);
+ /*
+ jmsComponent.getConfiguration().setRequestTimeout(600000);
+ jmsComponent.getConfiguration().setRequestMapPurgePollTimeMillis(30000);
+ */
+ context.addComponent(componentName, jmsComponent);
+ return context;
+ }
+ };
+
+ public static class ContextBuilderMessageIDReplyToTempDestinationAffinity extends ContextBuilderMessageID {
+ private String affinity;
+ public ContextBuilderMessageIDReplyToTempDestinationAffinity(String affinity) {
+ this.affinity = affinity;
+ }
+ public CamelContext buildContext(CamelContext context) throws Exception {
+ super.buildContext(context);
+ JmsComponent component = context.getComponent(componentName, JmsComponent.class);
+ component.getConfiguration().setReplyToTempDestinationAffinity(affinity);
+ return context;
+ }
+ }
protected static void init() {
if (inited.compareAndSet(false, true)) {
- ContextBuilder contextBuilderMessageID = new ContextBuilder() {
+ ContextBuilder contextBuilderMessageID = new ContextBuilderMessageID();
+ ContextBuilder contextBuilderMessageIDReplyToTempDestinationPerComponent =
+ new ContextBuilderMessageIDReplyToTempDestinationAffinity("component");
+ ContextBuilder contextBuilderMessageIDReplyToTempDestinationPerProducer =
+ new ContextBuilderMessageIDReplyToTempDestinationAffinity("producer");
+
+ ContextBuilder contextBuilderCorrelationID = new ContextBuilder() {
public CamelContext buildContext(CamelContext context) throws Exception {
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
- jmsComponent.setUseMessageIDAsCorrelationID(true);
+ jmsComponent.setUseMessageIDAsCorrelationID(false);
jmsComponent.setConcurrentConsumers(maxServerTasks);
+ /*
+ jmsComponent.getConfiguration().setRequestTimeout(600000);
+ jmsComponent.getConfiguration().setRequestMapPurgePollTimeMillis(60000);
+ */
context.addComponent(componentName, jmsComponent);
return context;
}
};
- ContextBuilder contextBuilderCorrelationID = new ContextBuilder() {
+ ContextBuilder contextBuilderMessageIDNamedReplyToSelector = new ContextBuilder() {
+ public CamelContext buildContext(CamelContext context) throws Exception {
+ ConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
+ jmsComponent.setUseMessageIDAsCorrelationID(true);
+ jmsComponent.setConcurrentConsumers(maxServerTasks);
+ jmsComponent.getConfiguration().setReplyToDestinationSelectorName(REPLY_TO_DESTINATION_SELECTOR_NAME);
+ context.addComponent(componentName, jmsComponent);
+ return context;
+ }
+ };
+
+ ContextBuilder contextBuilderCorrelationIDNamedReplyToSelector = new ContextBuilder() {
public CamelContext buildContext(CamelContext context) throws Exception {
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
jmsComponent.setUseMessageIDAsCorrelationID(false);
jmsComponent.setConcurrentConsumers(maxServerTasks);
+ jmsComponent.getConfiguration().setReplyToDestinationSelectorName(REPLY_TO_DESTINATION_SELECTOR_NAME);
context.addComponent(componentName, jmsComponent);
return context;
}
};
+
ContextBuilder contextBuilderCorrelationIDDiffComp = new ContextBuilder() {
public CamelContext buildContext(CamelContext context) throws Exception {
ConnectionFactory connectionFactory =
@@ -144,20 +218,71 @@
}
};
+ ContextBuilder contextBuilderMessageIDDiffComp = new ContextBuilder() {
+ public CamelContext buildContext(CamelContext context) throws Exception {
+ ConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
+ jmsComponent.setUseMessageIDAsCorrelationID(true);
+ jmsComponent.setConcurrentConsumers(maxServerTasks);
+ context.addComponent(componentName, jmsComponent);
+ jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
+ jmsComponent.setUseMessageIDAsCorrelationID(true);
+ jmsComponent.setConcurrentConsumers(maxServerTasks);
+ context.addComponent(componentName1, jmsComponent);
+ return context;
+ }
+ };
+
contextBuilders.put("testUseMessageIDAsCorrelationID", contextBuilderMessageID);
+ contextBuilders.put("testUseMessageIDAsCorrelationIDReplyToTempDestinationPerComponent",
+ contextBuilderMessageIDReplyToTempDestinationPerComponent);
+ contextBuilders.put("testUseMessageIDAsCorrelationIDReplyToTempDestinationPerProducer",
+ contextBuilderMessageIDReplyToTempDestinationPerProducer);
+
contextBuilders.put("testUseCorrelationID", contextBuilderCorrelationID);
contextBuilders.put("testUseMessageIDAsCorrelationIDMultiNode", contextBuilderMessageID);
contextBuilders.put("testUseCorrelationIDMultiNode", contextBuilderCorrelationID);
+
+ contextBuilders.put("testUseMessageIDAsCorrelationIDPersistReplyToMultiNode", contextBuilderMessageID);
+ contextBuilders.put("testUseCorrelationIDPersistReplyToMultiNode", contextBuilderCorrelationID);
+
+ contextBuilders.put("testUseMessageIDAsCorrelationIDPersistMultiReplyToMultiNode", contextBuilderMessageID);
+ contextBuilders.put("testUseCorrelationIDPersistMultiReplyToMultiNode", contextBuilderCorrelationID);
+
+ contextBuilders.put("testUseMessageIDAsCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode",
+ contextBuilderMessageIDNamedReplyToSelector);
+
+ contextBuilders.put("testUseCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode",
+ contextBuilderCorrelationIDNamedReplyToSelector);
+
contextBuilders.put("testUseCorrelationIDMultiNodeDiffComponents", contextBuilderCorrelationIDDiffComp);
+ contextBuilders.put("testUseMessageIDAsCorrelationIDMultiNodeDiffComponents", contextBuilderMessageIDDiffComp);
contextBuilders.put("testUseMessageIDAsCorrelationIDTimeout", contextBuilderMessageID);
contextBuilders.put("testUseCorrelationIDTimeout", contextBuilderMessageID);
routeBuilders.put("testUseMessageIDAsCorrelationID", new SingleNodeRouteBuilder());
+ routeBuilders.put("testUseMessageIDAsCorrelationIDReplyToTempDestinationPerComponent", new SingleNodeRouteBuilder());
+ routeBuilders.put("testUseMessageIDAsCorrelationIDReplyToTempDestinationPerProducer", new SingleNodeRouteBuilder());
routeBuilders.put("testUseCorrelationID", new SingleNodeRouteBuilder());
routeBuilders.put("testUseMessageIDAsCorrelationIDMultiNode", new MultiNodeRouteBuilder());
routeBuilders.put("testUseCorrelationIDMultiNode", new MultiNodeRouteBuilder());
+
+ routeBuilders.put("testUseMessageIDAsCorrelationIDPersistReplyToMultiNode", new MultiNodeRouteBuilder());
+ routeBuilders.put("testUseCorrelationIDPersistReplyToMultiNode", new MultiNodeRouteBuilder());
+
+ routeBuilders.put("testUseMessageIDAsCorrelationIDPersistMultiReplyToMultiNode", new MultiNodeReplyToRouteBuilder());
+ routeBuilders.put("testUseCorrelationIDPersistMultiReplyToMultiNode", new MultiNodeReplyToRouteBuilder());
+
+ routeBuilders.put("testUseMessageIDAsCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode",
+ new MultiNodeReplyToRouteBuilder());
+
+ routeBuilders.put("testUseCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode",
+ new MultiNodeReplyToRouteBuilder());
+
routeBuilders.put("testUseCorrelationIDMultiNodeDiffComponents", new MultiNodeDiffCompRouteBuilder());
+ routeBuilders.put("testUseMessageIDAsCorrelationIDMultiNodeDiffComponents", new MultiNodeDiffCompRouteBuilder());
routeBuilders.put("testUseMessageIDAsCorrelationIDTimeout", new SingleNodeDeadEndRouteBuilder());
routeBuilders.put("testUseCorrelationIDTimeout", new SingleNodeDeadEndRouteBuilder());
}
@@ -165,17 +290,19 @@
public class Task extends Thread {
private AtomicInteger counter;
+ private String fromUri;
private boolean ok = true;
private String message = "";
- public Task(AtomicInteger counter) {
+ public Task(AtomicInteger counter, String fromUri) {
this.counter = counter;
+ this.fromUri = fromUri;
}
public void run() {
for (int i = 0; i < maxCalls; i++) {
int callId = counter.incrementAndGet();
- Object reply = template.requestBody(endpoingUriA, request + "-" + callId);
+ Object reply = template.requestBody(fromUri, request + "-" + callId);
if (!reply.equals(expectedReply + "-" + callId)) {
ok = false;
message = "Unexpected reply. Expected: '" + expectedReply + "-" + callId
@@ -195,61 +322,131 @@
}
public void testUseMessageIDAsCorrelationID() throws Exception {
- runRequestReplyThreaded();
+ runRequestReplyThreaded(endpoingUriA);
+ }
+
+ public void testUseMessageIDAsCorrelationIDReplyToTempDestinationPerComponent() throws Exception {
+ runRequestReplyThreaded(endpoingUriA);
+ }
+
+ public void testUseMessageIDAsCorrelationIDReplyToTempDestinationPerProducer() throws Exception {
+ runRequestReplyThreaded(endpoingUriA);
}
public void testUseCorrelationID() throws Exception {
- runRequestReplyThreaded();
+ runRequestReplyThreaded(endpoingUriA);
}
public void testUseMessageIDAsCorrelationIDMultiNode() throws Exception {
- runRequestReplyThreaded();
+ runRequestReplyThreaded(endpoingUriA);
+ }
+
+ public void testUseCorrelationIDMultiNode() throws Exception {
+ runRequestReplyThreaded(endpoingUriA);
+ }
+
+ public void testUseMessageIDAsCorrelationIDPersistReplyToMultiNode() throws Exception {
+ runRequestReplyThreaded(endpoingtReplyToUriA);
+ }
+
+ public void testUseCorrelationIDPersistReplyToMultiNode() throws Exception {
+ runRequestReplyThreaded(endpoingUriA);
}
+ // (1)
+ // note this is an inefficient way of correlating replies to a persistent queue
+ // a consumer will have to be created for each reply message
+ // see testUseMessageIDAsCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode
+ // or testCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode
+ // for a faster way to do this. Note however that in this case the message copy has to occur
+ // between consumer -> producer as the selector value needs to be propagated to the ultimate
+ // destination, which in turn will copy this value back into the reply message
+ public void testUseMessageIDAsCorrelationIDPersistMultiReplyToMultiNode() throws Exception {
+ int oldMaxTasks = maxTasks;
+ int oldMaxServerTasks = maxServerTasks;
+ int oldMaxCalls = maxCalls;
+
+ maxTasks = 10;
+ maxServerTasks = 1;
+ maxCalls = 2;
+
+ try {
+ runRequestReplyThreaded(endpoingUriA);
+ } finally {
+ maxTasks = oldMaxTasks;
+ maxServerTasks = oldMaxServerTasks;
+ maxCalls = oldMaxCalls;
+ }
+ }
+
+ // see (1)
+ public void testUseCorrelationIDPersistMultiReplyToMultiNode() throws Exception {
+ int oldMaxTasks = maxTasks;
+ int oldMaxServerTasks = maxServerTasks;
+ int oldMaxCalls = maxCalls;
+
+ maxTasks = 10;
+ maxServerTasks = 1;
+ maxCalls = 2;
+
+ try {
+ runRequestReplyThreaded(endpoingUriA);
+ } finally {
+ maxTasks = oldMaxTasks;
+ maxServerTasks = oldMaxServerTasks;
+ maxCalls = oldMaxCalls;
+ }
+ }
+
+ public void testUseMessageIDAsCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode() throws Exception {
+ runRequestReplyThreaded(endpoingUriA);
+ }
+
+ public void testUseCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode() throws Exception {
+ runRequestReplyThreaded(endpoingUriA);
+ }
+
public void testUseCorrelationIDTimeout() throws Exception {
+ JmsComponent c = (JmsComponent)context.getComponent(componentName);
+ c.getConfiguration().setRequestTimeout(1000);
+ c.getConfiguration().setRequestMapPurgePollTimeMillis(1000);
+
Object reply = template.requestBody(endpoingUriA, request);
assertEquals(reply, request);
- JmsComponent c = (JmsComponent)context.getComponent(componentName);
+
+ JmsEndpoint endpoint = template.getResolvedEndpoint(endpoingUriA, JmsEndpoint.class);
// Wait 1 extra purge cycle to make sure that TimeoutMap had a chance to cleanup
- Thread.sleep(c.getConfiguration().getRequestMapPurgePollTimeMillis());
- assertTrue(c.getRequestor().getRequestMap().size() == 0);
+ Thread.sleep(endpoint.getConfiguration().getRequestMapPurgePollTimeMillis());
+ assertTrue(endpoint.getRequestor().getRequestMap().size() == 0);
}
public void testUseMessageIDAsCorrelationIDTimeout() throws Exception {
JmsComponent c = (JmsComponent)context.getComponent(componentName);
+ c.getConfiguration().setRequestTimeout(1000);
+ c.getConfiguration().setRequestMapPurgePollTimeMillis(1000);
+
Object reply = template.requestBody(endpoingUriA, request);
assertEquals(reply, request);
+
+ JmsEndpoint endpoint = template.getResolvedEndpoint(endpoingUriA, JmsEndpoint.class);
// Wait 1 extra purge cycle to make sure that TimeoutMap had a chance to cleanup
- Thread.sleep(c.getConfiguration().getRequestMapPurgePollTimeMillis());
- assertTrue(c.getRequestor().getDeferredRequestMap().size() == 0);
+ Thread.sleep(endpoint.getConfiguration().getRequestMapPurgePollTimeMillis());
+ assertTrue(endpoint.getRequestor().getDeferredRequestMap().size() == 0);
}
public void testUseCorrelationIDMultiNodeDiffComponents() throws Exception {
- runRequestReplyThreaded();
+ runRequestReplyThreaded(endpoingUriA);
}
- /*
- * REVISIT: This currently fails because there is a single instance of Requestor per JmsComponent
- * which shares requestMap amongst JmsProducers. This is a problem in case where the same correlationID
- * value travels between nodes serviced by the same JmsComponent:
- * client -> producer1 -> corrId -> consumer1 -> producer2 -> corrId -> consumer
- * producer1 (Bum! @) <- corrId <- consumer1 <- producer2 <- corrId <- reply
- *
- * @ - The request entry for corrId was already removed from JmsProducer shared requestMap
- *
- * Possible ways to solve this: Each JmsProducer gets its own replyTo destination
- *
-
- public void testUseCorrelationIDMultiNode() throws Exception {
- runRequestReplyThreaded();
- }
- */
-
- protected void runRequestReplyThreaded() throws Exception {
+ public void testUseMessageIDAsCorrelationIDMultiNodeDiffComponents() throws Exception {
+ runRequestReplyThreaded(endpoingUriA);
+ }
+
+ protected void runRequestReplyThreaded(String fromUri) throws Exception {
final AtomicInteger counter = new AtomicInteger(-1);
Task[] tasks = new Task[maxTasks];
for (int i = 0; i < maxTasks; ++i) {
- Task task = new Task(counter);
+ Task task = new Task(counter, fromUri);
tasks[i] = task;
task.start();
}
Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java Sat May 3 01:31:48 2008
@@ -30,9 +30,15 @@
private Logger log = Logger.getLogger(getClass());
private int count;
+ public ConditionalExceptionProcessor() {
+
+ }
+
public void process(Exchange exchange) throws Exception {
setCount(getCount() + 1);
+
+ // System.out.println(this + "; getCount() = " + getCount());
AbstractTransactionTest
.assertTrue(
Added: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java?rev=653015&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java Sat May 3 01:31:48 2008
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.tx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jms.JmsComponent;
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spring.SpringRouteBuilder;
+import org.apache.camel.spring.spi.SpringTransactionPolicy;
+import org.apache.log4j.Logger;
+
+/**
+ * Test case derived from:
+ * http://activemq.apache.org/camel/transactional-client.html and Martin
+ * Krasser's sample:
+ * http://www.nabble.com/JMS-Transactions---How-To-td15168958s22882.html#a15198803
+ * NOTE: had to split into separate test classes as I was unable to fully tear
+ * down and isolate the test cases, I'm not sure why, but as soon as we know the
+ * Transaction classes can be joined into one.
+ *
+ * @author Kevin Ross
+ */
+public class QueueToQueueRequestReplyTransactionTest extends AbstractTransactionTest {
+
+ private Logger log = Logger.getLogger(getClass());
+
+ public void testRollbackUsingXmlQueueToQueueRequestReplyUsingDynamicMessageSelector() throws Exception {
+
+ JmsComponent c = (JmsComponent)context.getComponent("activemq");
+ // c.getConfiguration().setRequestTimeout(600000);
+ JmsComponent c1 = (JmsComponent)context.getComponent("activemq-1");
+
+ context.addRoutes(new SpringRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ Policy required = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED_POLICY");
+ from("activemq:queue:foo?replyTo=queue:foo.reply").policy(required).process(new ConditionalExceptionProcessor()).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
+ from("activemq-1:queue:bar").process(new Processor() {
+ public void process(Exchange e) {
+ String request = e.getIn().getBody(String.class);
+ Message out = e.getOut(true);
+ String selectorValue = e.getIn().getHeader("camelProvider", String.class);
+ if (selectorValue != null) {
+ out.setHeader("camelProvider", selectorValue);
+ }
+ out.setBody("Re: " + request);
+ }
+ });
+ }
+ });
+
+ Object reply = template.requestBody("activemq:queue:foo", "blah");
+ assertTrue("Received unexpeced reply", reply.equals("Re: blah"));
+ }
+/*
+ * This is a working test but is commented out because there is bug in that ConditionalExceptionProcessor
+ * gets somehow reused among different tests, which it should not and then the second test always get its request
+ * flow rolled back
+ *
+ * I didn't split this test into two separate tests as I think this will be a good reminder of the problem that
+ * needs fixing
+ *
+ * The bellow log crearly shows the same processor reused between tests
+ * testRollbackUsingXmlQueueToQueueRequestReplyUsingDynamicMessageSelector()
+ * org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() = 1
+ * org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() = 2
+ *
+ * testRollbackUsingXmlQueueToQueueRequestReplyUsingMessageSelectorPerProducer()
+ * org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() = 3
+ * org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() = 4
+*/
+ /*
+ public void testRollbackUsingXmlQueueToQueueRequestReplyUsingMessageSelectorPerProducer() throws Exception {
+
+ JmsComponent c = (JmsComponent)context.getComponent("activemq");
+ c.getConfiguration().setReplyToDestinationSelectorName("camelProvider");
+ JmsComponent c1 = (JmsComponent)context.getComponent("activemq-1");
+ c1.getConfiguration().setReplyToDestinationSelectorName("camelProvider");
+
+ context.addRoutes(new SpringRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ Policy required = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED_POLICY");
+ from("activemq:queue:foo?replyTo=queue:foo.reply").policy(required).process(new ConditionalExceptionProcessor()).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
+ from("activemq-1:queue:bar").process(new Processor() {
+ public void process(Exchange e) {
+ System.out.println(e);
+ String request = e.getIn().getBody(String.class);
+ Message out = e.getOut(true);
+ String selectorValue = e.getIn().getHeader("camelProvider", String.class);
+ System.out.println("selectorValue = " + selectorValue);
+ out.setHeader("camelProvider", selectorValue);
+ out.setBody("Re: " + request);
+ }
+ });
+ }
+ });
+
+ Object reply = template.requestBody("activemq:queue:foo", "blah");
+ assertTrue("Received unexpeced reply", reply.equals("Re: blah"));
+ }
+ */
+
+}
Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/AbstractTransactionTest.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/AbstractTransactionTest.xml?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/AbstractTransactionTest.xml (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/AbstractTransactionTest.xml Sat May 3 01:31:48 2008
@@ -25,10 +25,18 @@
<property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
</bean>
+ <bean id="jmsConnectionFactory-1" class="org.apache.activemq.ActiveMQConnectionFactory">
+ <property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
+ </bean>
+
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
+ <bean id="jmsTransactionManager-1" class="org.springframework.jms.connection.JmsTransactionManager">
+ <property name="connectionFactory" ref="jmsConnectionFactory-1"/>
+ </bean>
+
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
@@ -36,10 +44,21 @@
<property name="concurrentConsumers" value="1"/>
</bean>
+ <bean id="jmsConfig-1" class="org.apache.camel.component.jms.JmsConfiguration">
+ <property name="connectionFactory" ref="jmsConnectionFactory-1"/>
+ <property name="transactionManager" ref="jmsTransactionManager-1"/>
+ <property name="transacted" value="true"/>
+ <property name="concurrentConsumers" value="1"/>
+ </bean>
+
<bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>
+ <bean id="activemq-1" class="org.apache.camel.component.jms.JmsComponent">
+ <property name="configuration" ref="jmsConfig-1"/>
+ </bean>
+
<bean id="PROPAGATION_REQUIRED_POLICY" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<constructor-arg>
<bean class="org.springframework.transaction.support.TransactionTemplate">