You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/25 09:31:31 UTC
svn commit: r978995 [1/2] - in /camel/trunk/components:
camel-jms/src/main/java/org/apache/camel/component/jms/
camel-jms/src/main/java/org/apache/camel/component/jms/reply/
camel-jms/src/main/java/org/apache/camel/component/jms/requestor/
camel-jms/sr...
Author: davsclaus
Date: Sun Jul 25 07:31:29 2010
New Revision: 978995
URL: http://svn.apache.org/viewvc?rev=978995&view=rev
Log:
CAMEL-2970: JmsProducer supports non blocking async routing engine for InOut Exchanges (request-reply over JMS).
Added:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java (contents, props changed)
- copied, changed from r966966, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UnknownReplyMessageException.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java (with props)
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/package.html (with props)
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncJmsInOutTest.java (with props)
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncJmsProducerTest.java (with props)
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutUseMessageIDasCorrelationIDTest.java
- copied, changed from r966501, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutIssueTest.java
Removed:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutIssueTest.java
camel/trunk/components/camel-jms/src/test/resources/log4j.properties
camel/trunk/components/camel-test/src/main/java/org/apache/camel/test/CamelTestSupport.java
camel/trunk/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Sun Jul 25 07:31:29 2010
@@ -67,7 +67,7 @@ public class EndpointMessageListener imp
LOG.trace("onMessage START");
if (LOG.isDebugEnabled()) {
- LOG.debug(endpoint + " consumer receiving JMS message: " + message);
+ LOG.debug(endpoint + " consumer received JMS message: " + message);
}
RuntimeCamelException rce = null;
@@ -82,6 +82,12 @@ public class EndpointMessageListener imp
if (LOG.isTraceEnabled()) {
LOG.trace("onMessage.process START");
}
+
+ String correlationId = message.getJMSCorrelationID();
+ if (correlationId != null) {
+ LOG.debug("Received Message has JMSCorrelationID [" + correlationId + "]");
+ }
+
processor.process(exchange);
if (LOG.isTraceEnabled()) {
LOG.trace("onMessage.process END");
@@ -292,19 +298,11 @@ public class EndpointMessageListener imp
getTemplate().send(replyDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
-
- if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
- String messageID = exchange.getIn().getHeader("JMSMessageID", String.class);
- reply.setJMSCorrelationID(messageID);
- } else {
- String correlationID = message.getJMSCorrelationID();
- if (correlationID != null) {
- reply.setJMSCorrelationID(correlationID);
- }
- }
+ final String correlationID = determineCorrelationId(message);
+ reply.setJMSCorrelationID(correlationID);
if (LOG.isDebugEnabled()) {
- LOG.debug(endpoint + " sending reply JMS message: " + reply);
+ LOG.debug(endpoint + " sending reply JMS message [correlationId:" + correlationID + "]: " + reply);
}
return reply;
}
@@ -318,7 +316,9 @@ public class EndpointMessageListener imp
try {
destination = message.getJMSReplyTo();
} catch (JMSException e) {
- LOG.trace("Cannot read JMSReplyTo header. Will ignore this exception.", e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot read JMSReplyTo header. Will ignore this exception.", e);
+ }
}
}
return destination;
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Sun Jul 25 07:31:29 2010
@@ -17,19 +17,16 @@
package org.apache.camel.component.jms;
import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.Session;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
-import org.apache.camel.component.jms.requestor.Requestor;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.EndpointHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,13 +51,10 @@ import static org.apache.camel.util.Obje
public class JmsComponent extends DefaultComponent implements ApplicationContextAware, HeaderFilterStrategyAware {
private static final transient Log LOG = LogFactory.getLog(JmsComponent.class);
- private static final int DEFAULT_THREADPOOL_SIZE = 100;
private static final String DEFAULT_QUEUE_BROWSE_STRATEGY = "org.apache.camel.component.jms.DefaultQueueBrowseStrategy";
private static final String KEY_FORMAT_STRATEGY_PARAM = "jmsKeyFormatStrategy";
- private ScheduledExecutorService scheduledExecutorService;
private JmsConfiguration configuration;
private ApplicationContext applicationContext;
- private Requestor requestor;
private QueueBrowseStrategy queueBrowseStrategy;
private boolean attemptedToCreateQueueBrowserStrategy;
private HeaderFilterStrategy headerFilterStrategy = new JmsHeaderFilterStrategy();
@@ -341,30 +335,6 @@ public class JmsComponent extends Defaul
getConfiguration().setDestinationResolver(destinationResolver);
}
- public synchronized Requestor getRequestor() throws Exception {
- if (requestor == null) {
- requestor = new Requestor(getConfiguration(), getScheduledExecutorService());
- requestor.start();
- }
- return requestor;
- }
-
- public void setRequestor(Requestor requestor) {
- this.requestor = requestor;
- }
-
- public synchronized ScheduledExecutorService getScheduledExecutorService() {
- if (scheduledExecutorService == null) {
- scheduledExecutorService = getCamelContext().getExecutorServiceStrategy()
- .newScheduledThreadPool(this, "JmsComponent", DEFAULT_THREADPOOL_SIZE);
- }
- return scheduledExecutorService;
- }
-
- public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
- this.scheduledExecutorService = scheduledExecutorService;
- }
-
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@@ -401,9 +371,6 @@ public class JmsComponent extends Defaul
@Override
protected void doStop() throws Exception {
- if (requestor != null) {
- requestor.stop();
- }
super.doStop();
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Sun Jul 25 07:31:29 2010
@@ -124,12 +124,6 @@ public class JmsConfiguration implements
private boolean useMessageIDAsCorrelationID;
private JmsProviderMetadata providerMetadata = new JmsProviderMetadata();
private JmsOperations metadataJmsOperations;
- // defines the component created temporary replyTo destination sharing strategy:
- // possible values are: "component", "endpoint", "producer"
- // component - a single temp queue is shared among all producers for a given component instance
- // endpoint - a single temp queue is shared among all producers for a given endpoint instance
- // producer - a single temp queue is created per producer
- private String replyToTempDestinationAffinity = REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT;
private String replyToDestination;
private String replyToDestinationSelectorName;
private JmsMessageType jmsMessageType;
@@ -157,10 +151,6 @@ public class JmsConfiguration implements
}
- public static interface MessageSentCallback {
- void sent(Message message);
- }
-
public static class CamelJmsTemplate extends JmsTemplate {
private JmsConfiguration config;
@@ -220,6 +210,9 @@ public class JmsConfiguration implements
try {
message = messageCreator.createMessage(session);
doSend(producer, message);
+ if (message != null && callback != null) {
+ callback.sent(message, destination);
+ }
// Check commit - avoid commit call within a JTA transaction.
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
@@ -228,9 +221,6 @@ public class JmsConfiguration implements
} finally {
JmsUtils.closeMessageProducer(producer);
}
- if (message != null && callback != null) {
- callback.sent(message);
- }
return null;
}
@@ -349,6 +339,9 @@ public class JmsConfiguration implements
logger.debug("Sending JMS message to: " + producer.getDestination() + " with message: " + message);
}
doSend(producer, message);
+ if (message != null && callback != null) {
+ callback.sent(message, destination);
+ }
// Check commit - avoid commit call within a JTA transaction.
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
@@ -357,9 +350,6 @@ public class JmsConfiguration implements
} finally {
JmsUtils.closeMessageProducer(producer);
}
- if (message != null && callback != null) {
- callback.sent(message);
- }
return null;
}
@@ -516,6 +506,7 @@ public class JmsConfiguration implements
// Properties
// -------------------------------------------------------------------------
+
public ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
connectionFactory = createConnectionFactory();
@@ -863,10 +854,12 @@ public class JmsConfiguration implements
* <p/>
* By default this is false as you need to commit the outgoing request before you can consume the input
*/
+ @Deprecated
public boolean isTransactedInOut() {
return transactedInOut;
}
+ @Deprecated
public void setTransactedInOut(boolean transactedInOut) {
this.transactedInOut = transactedInOut;
}
@@ -1231,14 +1224,6 @@ public class JmsConfiguration implements
this.useMessageIDAsCorrelationID = useMessageIDAsCorrelationID;
}
- public String getReplyToTempDestinationAffinity() {
- return replyToTempDestinationAffinity;
- }
-
- public void setReplyToTempDestinationAffinity(String replyToTempDestinationAffinity) {
- this.replyToTempDestinationAffinity = replyToTempDestinationAffinity;
- }
-
public long getRequestTimeout() {
return requestTimeout;
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Sun Jul 25 07:31:29 2010
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.jms;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -32,12 +36,17 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
-import org.apache.camel.component.jms.requestor.Requestor;
+import org.apache.camel.Service;
+import org.apache.camel.component.jms.reply.PersistentQueueReplyManager;
+import org.apache.camel.component.jms.reply.ReplyHolder;
+import org.apache.camel.component.jms.reply.ReplyManager;
+import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.spi.ManagementAware;
+import org.apache.camel.util.ServiceHelper;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -53,7 +62,7 @@ import org.springframework.transaction.P
* @version $Revision:520964 $
*/
@ManagedResource(description = "Managed JMS Endpoint")
-public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, ManagementAware<JmsEndpoint>, MultipleConsumersSupport {
+public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, ManagementAware<JmsEndpoint>, MultipleConsumersSupport, Service {
private HeaderFilterStrategy headerFilterStrategy;
private boolean pubSubDomain;
private JmsBinding binding;
@@ -61,8 +70,10 @@ public class JmsEndpoint extends Default
private Destination destination;
private String selector;
private JmsConfiguration configuration;
- private Requestor requestor;
- private ScheduledExecutorService requestorExecutorService;
+ private final Map<String, ReplyManager> replyToReplyManager = new HashMap<String, ReplyManager>();
+ private ReplyManager replyManager;
+ // scheduled executor to check for timeout (reply not received)
+ private ScheduledExecutorService replyManagerExecutorService;
public JmsEndpoint() {
this(null, null);
@@ -284,16 +295,29 @@ public class JmsEndpoint extends Default
return true;
}
- public synchronized Requestor getRequestor() throws Exception {
- if (requestor == null) {
- requestor = new Requestor(getConfiguration(), getRequestorExecutorService());
- requestor.start();
+ public synchronized ReplyManager getReplyManager() throws Exception {
+ if (replyManager == null) {
+ // use a temporary queue
+ replyManager = new TemporaryQueueReplyManager();
+ replyManager.setEndpoint(this);
+ replyManager.setScheduledExecutorService(getReplyManagerExecutorService());
+ ServiceHelper.startService(replyManager);
}
- return requestor;
+ return replyManager;
}
- public void setRequestor(Requestor requestor) {
- this.requestor = requestor;
+ public synchronized ReplyManager getReplyManager(String replyTo) throws Exception {
+ ReplyManager answer = replyToReplyManager.get(replyTo);
+ if (answer == null) {
+ // use a persistent queue
+ answer = new PersistentQueueReplyManager();
+ answer.setEndpoint(this);
+ answer.setScheduledExecutorService(getReplyManagerExecutorService());
+ ServiceHelper.startService(answer);
+ // remember this manager so we can re-use it
+ replyToReplyManager.put(replyTo, answer);
+ }
+ return answer;
}
public boolean isPubSubDomain() {
@@ -343,11 +367,28 @@ public class JmsEndpoint extends Default
return template;
}
- protected synchronized ScheduledExecutorService getRequestorExecutorService() {
- if (requestorExecutorService == null) {
- requestorExecutorService = getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "JmsRequesterTimeoutTask", 1);
+ protected synchronized ScheduledExecutorService getReplyManagerExecutorService() {
+ if (replyManagerExecutorService == null) {
+ replyManagerExecutorService = getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "JmsReplyManagerTimeoutChecker", 1);
+ }
+ return replyManagerExecutorService;
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ if (replyManager != null) {
+ ServiceHelper.stopService(replyManager);
+ replyManager = null;
+ }
+
+ if (!replyToReplyManager.isEmpty()) {
+ for (ReplyManager replyManager : replyToReplyManager.values()) {
+ ServiceHelper.stopService(replyManager);
+ }
+ replyToReplyManager.clear();
}
- return requestorExecutorService;
}
// Delegated properties from the configuration
@@ -459,10 +500,6 @@ public class JmsEndpoint extends Default
return getConfiguration().getReplyToDestinationSelectorName();
}
- public String getReplyToTempDestinationAffinity() {
- return getConfiguration().getReplyToTempDestinationAffinity();
- }
-
public long getRequestMapPurgePollTimeMillis() {
return getConfiguration().getRequestMapPurgePollTimeMillis();
}
@@ -768,10 +805,6 @@ public class JmsEndpoint extends Default
getConfiguration().setReplyToDestinationSelectorName(replyToDestinationSelectorName);
}
- public void setReplyToTempDestinationAffinity(String replyToTempDestinationAffinity) {
- getConfiguration().setReplyToTempDestinationAffinity(replyToTempDestinationAffinity);
- }
-
public void setRequestMapPurgePollTimeMillis(long requestMapPurgePollTimeMillis) {
getConfiguration().setRequestMapPurgePollTimeMillis(requestMapPurgePollTimeMillis);
}
@@ -911,4 +944,6 @@ public class JmsEndpoint extends Default
return super.createEndpointUri();
}
+
+
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java Sun Jul 25 07:31:29 2010
@@ -125,4 +125,19 @@ public final class JmsMessageHelper {
}
}
+ /**
+ * Sets the correlation id on the JMS message.
+ * <p/>
+ * Will ignore exception thrown
+ *
+ * @param message the JMS message
+ * @param correlationId the correlation id
+ */
+ public static void setCorrelationId(Message message, String correlationId) {
+ try {
+ message.setJMSCorrelationID(correlationId);
+ } catch (JMSException e) {
+ // ignore
+ }
+ }
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Sun Jul 25 07:31:29 2010
@@ -16,11 +16,7 @@
*/
package org.apache.camel.component.jms;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -28,21 +24,19 @@ import javax.jms.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.FailedToCreateProducerException;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate102;
-import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap;
-import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
-import org.apache.camel.component.jms.requestor.PersistentReplyToRequestor;
-import org.apache.camel.component.jms.requestor.Requestor;
+import org.apache.camel.component.jms.reply.ReplyManager;
+import org.apache.camel.component.jms.reply.UseMessageIdAsCorrelationIdMessageSentCallback;
import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.UuidGenerator;
import org.apache.camel.util.ValueHolder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.MessageCreator;
@@ -51,97 +45,46 @@ import org.springframework.jms.core.Mess
*/
public class JmsProducer extends DefaultAsyncProducer {
private static final transient Log LOG = LogFactory.getLog(JmsProducer.class);
- private RequestorAffinity affinity;
private final JmsEndpoint endpoint;
+ private final AtomicBoolean started = new AtomicBoolean(false);
private JmsOperations inOnlyTemplate;
private JmsOperations inOutTemplate;
private UuidGenerator uuidGenerator;
- private DeferredRequestReplyMap deferredRequestReplyMap;
- private Requestor requestor;
- private AtomicBoolean started = new AtomicBoolean(false);
-
- private enum RequestorAffinity {
- PER_COMPONENT(0),
- PER_ENDPOINT(1),
- PER_PRODUCER(2);
- private int value;
- private RequestorAffinity(int value) {
- this.value = value;
- }
- }
+ private ReplyManager replyManager;
public JmsProducer(JmsEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
- JmsConfiguration c = endpoint.getConfiguration();
- affinity = RequestorAffinity.PER_PRODUCER;
- if (c.getReplyTo() != null) {
- if (c.getReplyToTempDestinationAffinity().equals(JmsConfiguration.REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT)) {
- affinity = RequestorAffinity.PER_ENDPOINT;
- } else if (c.getReplyToTempDestinationAffinity().equals(JmsConfiguration.REPLYTO_TEMP_DEST_AFFINITY_PER_COMPONENT)) {
- affinity = RequestorAffinity.PER_COMPONENT;
- }
- }
- }
-
- public long getRequestTimeout() {
- return endpoint.getConfiguration().getRequestTimeout();
}
- protected void doStart() throws Exception {
- super.doStart();
- }
-
- protected void testAndSetRequestor() throws RuntimeCamelException {
+ protected void initReplyManager() {
if (!started.get()) {
synchronized (this) {
if (started.get()) {
return;
}
try {
- JmsConfiguration c = endpoint.getConfiguration();
- if (c.getReplyTo() != null) {
- requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint.getRequestorExecutorService());
- requestor.start();
+ if (endpoint.getReplyTo() != null) {
+ replyManager = endpoint.getReplyManager(endpoint.getReplyTo());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Using JmsReplyManager: " + replyManager + " to process replies from: " + endpoint.getReplyTo()
+ + " queue with " + endpoint.getConcurrentConsumers() + " concurrent consumers.");
+ }
} else {
- if (affinity == RequestorAffinity.PER_PRODUCER) {
- requestor = new Requestor(endpoint.getConfiguration(), endpoint.getRequestorExecutorService());
- requestor.start();
- } else if (affinity == RequestorAffinity.PER_ENDPOINT) {
- requestor = endpoint.getRequestor();
- } else if (affinity == RequestorAffinity.PER_COMPONENT) {
- requestor = ((JmsComponent)endpoint.getComponent()).getRequestor();
+ replyManager = endpoint.getReplyManager();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Using JmsReplyManager: " + replyManager + " to process replies from temporary queue with "
+ + endpoint.getConcurrentConsumers() + " concurrent consumers.");
}
}
} catch (Exception e) {
throw new FailedToCreateProducerException(endpoint, e);
}
- deferredRequestReplyMap = requestor.getDeferredRequestReplyMap(this);
started.set(true);
}
}
}
- protected void testAndUnsetRequestor() throws Exception {
- if (started.get()) {
- synchronized (this) {
- if (!started.get()) {
- return;
- }
- requestor.removeDeferredRequestReplyMap(this);
- if (affinity == RequestorAffinity.PER_PRODUCER) {
- requestor.stop();
- }
- started.set(false);
- }
- }
- }
-
- protected void doStop() throws Exception {
- testAndUnsetRequestor();
- super.doStop();
- }
-
public boolean process(Exchange exchange, AsyncCallback callback) {
if (!endpoint.isDisableReplyTo() && exchange.getPattern().isOutCapable()) {
// in out requires a bit more work than in only
@@ -173,111 +116,76 @@ public class JmsProducer extends Default
destinationName = null;
}
- testAndSetRequestor();
+ initReplyManager();
// note due to JMS transaction semantics we cannot use a single transaction
// for sending the request and receiving the response
- final Destination replyTo = requestor.getReplyTo();
-
+ final Destination replyTo = replyManager.getReplyTo();
if (replyTo == null) {
throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
}
+ // when using message id as correlation id, we need at first to use a provisional correlation id
+ // which we then update to the real JMSMessageID when the message has been sent
+ // this is done with the help of the MessageSentCallback
final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
- String correlationId = in.getHeader("JMSCorrelationID", String.class);
+ final String provisionalCorrelationId = msgIdAsCorrId ? getUuidGenerator().generateUuid() : null;
+ MessageSentCallback messageSentCallback = null;
+ if (msgIdAsCorrId) {
+ messageSentCallback = new UseMessageIdAsCorrelationIdMessageSentCallback(replyManager, provisionalCorrelationId, endpoint.getRequestTimeout());
+ }
+ final ValueHolder<MessageSentCallback> sentCallback = new ValueHolder<MessageSentCallback>(messageSentCallback);
- if (correlationId == null && !msgIdAsCorrId) {
+ final String originalCorrelationId = in.getHeader("JMSCorrelationID", String.class);
+ if (originalCorrelationId == null && !msgIdAsCorrId) {
in.setHeader("JMSCorrelationID", getUuidGenerator().generateUuid());
}
- final ValueHolder<FutureTask> futureHolder = new ValueHolder<FutureTask>();
- final DeferredMessageSentCallback jmsCallback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
-
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
message.setJMSReplyTo(replyTo);
- requestor.setReplyToSelectorHeader(in, message);
+ replyManager.setReplyToSelectorHeader(in, message);
- FutureTask future;
- future = (!msgIdAsCorrId)
- ? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout())
- : requestor.getReceiveFuture(jmsCallback);
+ String correlationId = determineCorrelationId(message, provisionalCorrelationId);
+ replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, endpoint.getRequestTimeout());
- futureHolder.set(future);
return message;
}
};
- doSend(true, destinationName, destination, messageCreator, jmsCallback);
+ doSend(true, destinationName, destination, messageCreator, sentCallback.get());
// after sending then set the OUT message id to the JMSMessageID so its identical
setMessageId(exchange);
- // now we should routing asynchronously to not block while waiting for the reply
- // TODO:
- // we need a thread pool to use for continue routing messages, just like a seda consumer
- // and we need options to configure it as well so you can indicate how many threads to use
- // TODO: Also consider requestTimeout
-
- // lets wait and return the response
- long requestTimeout = endpoint.getConfiguration().getRequestTimeout();
- try {
- Message message = null;
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Message sent, now waiting for reply at: " + replyTo.toString());
- }
- if (requestTimeout <= 0) {
- message = (Message)futureHolder.get().get();
- } else {
- message = (Message)futureHolder.get().get(requestTimeout, TimeUnit.MILLISECONDS);
- }
- } catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Future interrupted: " + e, e);
- }
- } catch (TimeoutException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Future timed out: " + e, e);
- }
- }
- if (message != null) {
- // the response can be an exception
- JmsMessage response = new JmsMessage(message, endpoint.getBinding());
- Object body = response.getBody();
-
- if (endpoint.isTransferException() && body instanceof Exception) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reply received. Setting reply as an Exception: " + body);
- }
- // we got an exception back and endpoint was configured to transfer exception
- // therefore set response as exception
- exchange.setException((Exception) body);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reply received. Setting reply as OUT message: " + body);
- }
- // regular response
- exchange.setOut(response);
- }
+ // continue routing asynchronously (reply will be processed async when its received)
+ return false;
+ }
- // restore correlation id in case the remote server messed with it
- if (correlationId != null) {
- message.setJMSCorrelationID(correlationId);
- exchange.getOut().setHeader("JMSCorrelationID", correlationId);
- }
- } else {
- // no response, so lets set a timed out exception
- exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
- }
- } catch (Exception e) {
- exchange.setException(e);
+ /**
+ * Strategy to determine which correlation id to use among <tt>JMSMessageID</tt> and <tt>JMSCorrelationID</tt>.
+ *
+ * @param message the JMS message
+ * @param provisionalCorrelationId an optional provisional correlation id, which is preferred to be used
+ * @return the correlation id to use
+ * @throws JMSException can be thrown
+ */
+ protected String determineCorrelationId(Message message, String provisionalCorrelationId) throws JMSException {
+ if (provisionalCorrelationId != null) {
+ return provisionalCorrelationId;
}
- // TODO: should be async
- callback.done(true);
- return true;
+ final String messageId = message.getJMSMessageID();
+ final String correlationId = message.getJMSCorrelationID();
+ if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
+ return messageId;
+ } else if (ObjectHelper.isEmpty(correlationId)) {
+ // correlation id is empty so fallback to message id
+ return messageId;
+ } else {
+ return correlationId;
+ }
}
protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) {
@@ -340,14 +248,14 @@ public class JmsProducer extends Default
/**
* Sends the message using the JmsTemplate.
*
- * @param inOut use inOut or inOnly template
+ * @param inOut use inOut or inOnly template
* @param destinationName the destination name
* @param destination the destination (if no name provided)
- * @param messageCreator the creator to create the javax.jms.Message to send
+ * @param messageCreator the creator to create the {@link Message} to send
* @param callback optional callback for inOut messages
*/
protected void doSend(boolean inOut, String destinationName, Destination destination,
- MessageCreator messageCreator, DeferredMessageSentCallback callback) {
+ MessageCreator messageCreator, MessageSentCallback callback) {
CamelJmsTemplate template = null;
CamelJmsTemplate102 template102 = null;
@@ -405,7 +313,7 @@ public class JmsProducer extends Default
}
} catch (JMSException e) {
LOG.warn("Unable to retrieve JMSMessageID from outgoing "
- + "JMS Message and set it into Camel's MessageId", e);
+ + "JMS Message and set it into Camel's MessageId", e);
}
}
}
@@ -433,9 +341,6 @@ public class JmsProducer extends Default
}
public UuidGenerator getUuidGenerator() {
- if (uuidGenerator == null) {
- uuidGenerator = UuidGenerator.get();
- }
return uuidGenerator;
}
@@ -443,4 +348,16 @@ public class JmsProducer extends Default
this.uuidGenerator = uuidGenerator;
}
+ protected void doStart() throws Exception {
+ super.doStart();
+ if (uuidGenerator == null) {
+ // use the default generator
+ uuidGenerator = UuidGenerator.get();
+ }
+ }
+
+ protected void doStop() throws Exception {
+ super.doStop();
+ }
+
}
Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+
+/**
+ * Callback when a {@link Message} has been sent.
+ *
+ * @version $Revision$
+ */
+public interface MessageSentCallback {
+
+ /**
+ * Callback when the message has been sent.
+ *
+ * @param message the message
+ * @param destination the destination
+ */
+ void sent(Message message, Destination destination);
+}
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.util.DefaultTimeoutMap;
+
+/**
+ * @version $Revision$
+ */
+public class CorrelationMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+ public CorrelationMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+ super(executor, requestMapPollTimeMillis);
+ }
+
+ public boolean onEviction(String key, ReplyHandler value) {
+ // trigger timeout
+ value.onTimeout(key);
+ // return true to remove the element
+ return true;
+ }
+
+}
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java (from r966966, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java?p2=camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java&p1=camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java&r1=966966&r2=978995&rev=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java Sun Jul 25 07:31:29 2010
@@ -14,17 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.jms.requestor;
+package org.apache.camel.component.jms.reply;
import java.util.HashMap;
import java.util.Map;
-public class MessageSelectorProvider {
+/**
+ * A creator which can build the JMS message selector query string to use
+ * with a shared persistent reply-to queue, so we can select the correct messages we expect as replies.
+ */
+public class MessageSelectorCreator {
protected Map<String, String> correlationIds;
protected boolean dirty = true;
protected StringBuilder expression;
- public MessageSelectorProvider() {
+ public MessageSelectorCreator() {
correlationIds = new HashMap<String, String>();
}
@@ -42,18 +46,27 @@ public class MessageSelectorProvider {
if (!dirty) {
return expression.toString();
}
+
expression = new StringBuilder("JMSCorrelationID='");
- boolean first = true;
- for (Map.Entry<String, String> entry : correlationIds.entrySet()) {
- if (!first) {
- expression.append(" OR JMSCorrelationID='");
- }
- expression.append(entry.getValue()).append("'");
- if (first) {
- first = false;
+
+ if (correlationIds.isEmpty()) {
+ // no id's so use a dummy to select nothing
+ expression.append("CamelDummyJmsMessageSelector'");
+ } else {
+ boolean first = true;
+ for (Map.Entry<String, String> entry : correlationIds.entrySet()) {
+ if (!first) {
+ expression.append(" OR JMSCorrelationID='");
+ }
+ expression.append(entry.getValue()).append("'");
+ if (first) {
+ first = false;
+ }
}
}
+
dirty = false;
return expression.toString();
}
-}
+
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import javax.jms.Message;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * {@link ReplyHandler} to handle processing replies when using persistent queues.
+ *
+ * @version $Revision$
+ */
+public class PersistentQueueReplyHandler extends TemporaryQueueReplyHandler {
+
+ private MessageSelectorCreator dynamicMessageSelector;
+
+ public PersistentQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+ String originalCorrelationId, long timeout, MessageSelectorCreator dynamicMessageSelector) {
+ super(replyManager, exchange, callback, originalCorrelationId, timeout);
+ this.dynamicMessageSelector = dynamicMessageSelector;
+ }
+
+ @Override
+ public void onReply(String correlationId, Message reply) {
+ if (dynamicMessageSelector != null) {
+ // remove correlation id from message selector
+ dynamicMessageSelector.removeCorrelationID(correlationId);
+ }
+ super.onReply(correlationId, reply);
+ }
+
+ @Override
+ public void onTimeout(String correlationId) {
+ if (dynamicMessageSelector != null) {
+ // remove correlation id from message selector
+ dynamicMessageSelector.removeCorrelationID(correlationId);
+ }
+ super.onTimeout(correlationId);
+ }
+}
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import java.math.BigInteger;
+import java.util.Random;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.transaction.PlatformTransactionManager;
+
+/**
+ * A {@link ReplyManager} when using persistent queues.
+ *
+ * @version $Revision$
+ */
+public class PersistentQueueReplyManager extends ReplyManagerSupport {
+
+ private String replyToSelectorValue;
+ private MessageSelectorCreator dynamicMessageSelector;
+
+ public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+ String originalCorrelationId, String correlationId, long requestTimeout) {
+ // add to correlation map
+ PersistentQueueReplyHandler handler = new PersistentQueueReplyHandler(replyManager, exchange, callback,
+ originalCorrelationId, requestTimeout, dynamicMessageSelector);
+ correlation.put(correlationId, handler, requestTimeout);
+ if (dynamicMessageSelector != null) {
+ // also remember to keep the dynamic selector updated with the new correlation id
+ dynamicMessageSelector.addCorrelationID(correlationId);
+ }
+ return correlationId;
+ }
+
+ public void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout) {
+ if (log.isTraceEnabled()) {
+ log.trace("Updated provisional correlationId [" + correlationId + "] to expected correlationId [" + newCorrelationId + "]");
+ }
+
+ ReplyHandler handler = correlation.remove(correlationId);
+ if (handler == null) {
+ // should not happen that we can't find the handler
+ return;
+ }
+
+ correlation.put(newCorrelationId, handler, requestTimeout);
+
+ // no not arrived early
+ if (dynamicMessageSelector != null) {
+ // also remember to keep the dynamic selector updated with the new correlation id
+ dynamicMessageSelector.addCorrelationID(newCorrelationId);
+ }
+ }
+
+ protected void handleReplyMessage(String correlationID, Message message) {
+ ReplyHandler handler = correlation.get(correlationID);
+ if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
+ handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
+ }
+
+ if (handler != null) {
+ try {
+ handler.onReply(correlationID, message);
+ } finally {
+ if (dynamicMessageSelector != null) {
+ // also remember to keep the dynamic selector updated with the new correlation id
+ dynamicMessageSelector.removeCorrelationID(correlationID);
+ }
+ correlation.remove(correlationID);
+ }
+ } else {
+ // we could not correlate the received reply message to a matching request and therefore
+ // we cannot continue routing the unknown message
+ String text = "Reply received for unknown correlationID [" + correlationID + "] -> " + message;
+ log.warn(text);
+ throw new UnknownReplyMessageException(text, message, correlationID);
+ }
+ }
+
+ public void setReplyToSelectorHeader(org.apache.camel.Message camelMessage, Message jmsMessage) throws JMSException {
+ String replyToSelectorName = endpoint.getReplyToDestinationSelectorName();
+ if (replyToSelectorName != null && replyToSelectorValue != null) {
+ camelMessage.setHeader(replyToSelectorName, replyToSelectorValue);
+ jmsMessage.setStringProperty(replyToSelectorName, replyToSelectorValue);
+ }
+ }
+
+ private final class DestinationResolverDelegate implements DestinationResolver {
+ private DestinationResolver delegate;
+ private Destination destination;
+
+ public DestinationResolverDelegate(DestinationResolver delegate) {
+ this.delegate = delegate;
+ }
+
+ public Destination resolveDestinationName(Session session, String destinationName,
+ boolean pubSubDomain) throws JMSException {
+ synchronized (PersistentQueueReplyManager.this) {
+ try {
+ // resolve the reply to destination
+ if (destination == null) {
+ destination = delegate.resolveDestinationName(session, destinationName, pubSubDomain);
+ setReplyTo(destination);
+ }
+ } finally {
+ PersistentQueueReplyManager.this.notifyAll();
+ }
+ }
+ return destination;
+ }
+ };
+
+ private final class PersistentQueueMessageListenerContainer extends DefaultMessageListenerContainer {
+
+ private String fixedMessageSelector;
+ private MessageSelectorCreator creator;
+
+ private PersistentQueueMessageListenerContainer(String fixedMessageSelector) {
+ this.fixedMessageSelector = fixedMessageSelector;
+ }
+
+ private PersistentQueueMessageListenerContainer(MessageSelectorCreator creator) {
+ this.creator = creator;
+ }
+
+ @Override
+ public String getMessageSelector() {
+ String id = null;
+ if (fixedMessageSelector != null) {
+ id = fixedMessageSelector;
+ } else if (creator != null) {
+ id = creator.get();
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Using MessageSelector[" + id + "]");
+ }
+ return id;
+ }
+ }
+
+ protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
+ DefaultMessageListenerContainer answer;
+
+ String replyToSelectorName = endpoint.getReplyToDestinationSelectorName();
+ if (replyToSelectorName != null) {
+ // 24 max char is what IBM WebSphereMQ supports in CorrelationIDs
+ // use a fixed selector name so we can select the replies which is intended for us
+ replyToSelectorValue = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
+ String fixedMessageSelector = replyToSelectorName + "='" + replyToSelectorValue + "'";
+ answer = new PersistentQueueMessageListenerContainer(fixedMessageSelector);
+ } else {
+ // use a dynamic message selector which will select the message we want to receive as reply
+ dynamicMessageSelector = new MessageSelectorCreator();
+ answer = new PersistentQueueMessageListenerContainer(dynamicMessageSelector);
+ }
+
+ answer.setConnectionFactory(endpoint.getListenerConnectionFactory());
+ DestinationResolver resolver = endpoint.getDestinationResolver();
+ if (resolver == null) {
+ resolver = answer.getDestinationResolver();
+ }
+ answer.setDestinationResolver(new DestinationResolverDelegate(resolver));
+ answer.setDestinationName(endpoint.getReplyTo());
+
+ answer.setAutoStartup(true);
+ answer.setMessageListener(this);
+ answer.setPubSubDomain(false);
+ answer.setSubscriptionDurable(false);
+ answer.setConcurrentConsumers(endpoint.getConcurrentConsumers());
+
+ ExceptionListener exceptionListener = endpoint.getExceptionListener();
+ if (exceptionListener != null) {
+ answer.setExceptionListener(exceptionListener);
+ }
+
+ answer.setSessionTransacted(endpoint.isTransacted());
+ if (endpoint.isTransacted()) {
+ answer.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
+ } else {
+ if (endpoint.getAcknowledgementMode() >= 0) {
+ answer.setSessionAcknowledgeMode(endpoint.getAcknowledgementMode());
+ } else if (endpoint.getAcknowledgementModeName() != null) {
+ answer.setSessionAcknowledgeModeName(endpoint.getAcknowledgementModeName());
+ }
+ }
+
+ answer.setConcurrentConsumers(1);
+ answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
+
+ if (endpoint.getReceiveTimeout() >= 0) {
+ answer.setReceiveTimeout(endpoint.getReceiveTimeout());
+ }
+ if (endpoint.getRecoveryInterval() >= 0) {
+ answer.setRecoveryInterval(endpoint.getRecoveryInterval());
+ }
+ TaskExecutor taskExecutor = endpoint.getTaskExecutor();
+ if (taskExecutor != null) {
+ answer.setTaskExecutor(taskExecutor);
+ }
+ PlatformTransactionManager tm = endpoint.getTransactionManager();
+ if (tm != null) {
+ answer.setTransactionManager(tm);
+ } else if (endpoint.isTransacted()) {
+ throw new IllegalArgumentException("Property transacted is enabled but a transactionManager was not injected!");
+ }
+ if (endpoint.getTransactionName() != null) {
+ answer.setTransactionName(endpoint.getTransactionName());
+ }
+ if (endpoint.getTransactionTimeout() >= 0) {
+ answer.setTransactionTimeout(endpoint.getTransactionTimeout());
+ }
+
+ return answer;
+ }
+
+}
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import javax.jms.Message;
+
+/**
+ * Handles a reply.
+ *
+ * @version $Revision$
+ */
+public interface ReplyHandler {
+
+ /**
+ * The reply message was received
+ *
+ * @param correlationId the correlation id
+ * @param reply the reply message
+ */
+ void onReply(String correlationId, Message reply);
+
+ /**
+ * The reply message was not received and a timeout triggered
+ *
+ * @param correlationId the correlation id
+ */
+ void onTimeout(String correlationId);
+}
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import javax.jms.Message;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * Holder which contains the {@link Exchange} and {@link org.apache.camel.AsyncCallback} to be used
+ * when the reply arrives, so we can set the reply on the {@link Exchange} and continue routing using the callback.
+ *
+ * @version $Revision$
+ */
+public class ReplyHolder {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+ private final Message message;
+ private final String originalCorrelationId;
+ private long timeout;
+
+ /**
+ * Constructor to use when a reply message was received
+ */
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, Message message) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.originalCorrelationId = originalCorrelationId;
+ this.message = message;
+ }
+
+ /**
+ * Constructor to use when a timeout occurred
+ */
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, long timeout) {
+ this(exchange, callback, originalCorrelationId, null);
+ this.timeout = timeout;
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ public AsyncCallback getCallback() {
+ return callback;
+ }
+
+ /**
+ * Gets the original correlation id, if one was set when sending the message.
+ * <p/>
+ * Some JMS brokers will mess with the correlation id and send back a different/empty correlation id.
+ * So we need to remember it so we can restore the correlation id.
+ */
+ public String getOriginalCorrelationId() {
+ return originalCorrelationId;
+ }
+
+ /**
+ * Gets the received message
+ *
+ * @return the received message, or <tt>null</tt> if timeout occurred and no message has been received
+ * @see #isTimeout()
+ */
+ public Message getMessage() {
+ return message;
+ }
+
+ /**
+ * Whether timeout triggered or not.
+ * <p/>
+ * A timeout is triggered if <tt>requestTimeout</tt> option has been configured, and a reply message has <b>not</b> been
+ * received within that time frame.
+ */
+ public boolean isTimeout() {
+ return message == null;
+ }
+
+ /**
+ * The timeout value
+ */
+ public long getRequestTimeout() {
+ return timeout;
+ }
+}
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.jms.JmsEndpoint;
+
+/**
+ * The {@link ReplyManager} is responsible for handling <a href="http://camel.apache.org/request-reply.html">request-reply</a>
+ * over JMS.
+ *
+ * @version $Revision$
+ */
+public interface ReplyManager extends MessageListener {
+
+ /**
+ * Sets the belonging {@link org.apache.camel.component.jms.JmsEndpoint}.
+ */
+ void setEndpoint(JmsEndpoint endpoint);
+
+ /**
+ * Sets the reply to queue the manager should listen for replies.
+ * <p/>
+ * The queue is either a temporary or a persistent queue.
+ */
+ void setReplyTo(Destination replyTo);
+
+ /**
+ * Sets the scheduled to use when checking for timeouts (no reply received within a given time period)
+ */
+ void setScheduledExecutorService(ScheduledExecutorService executorService);
+
+ /**
+ * Gets the reply to queue being used
+ */
+ Destination getReplyTo();
+
+ /**
+ * To be used when a persistent reply queue is used with a custom JMS selector is being used.
+ */
+ void setReplyToSelectorHeader(org.apache.camel.Message camelMessage, Message jmsMessage) throws JMSException;
+
+ /**
+ * Register a reply
+ *
+ * @param replyManager the reply manager being used
+ * @param exchange the exchange
+ * @param callback the callback
+ * @param originalCorrelationId an optional original correlation id
+ * @param correlationId the correlation id to expect being used
+ * @param requestTimeout an optional timeout
+ * @return the correlation id used
+ */
+ String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+ String originalCorrelationId, String correlationId, long requestTimeout);
+
+ /**
+ * Updates the correlation id to the new correlation id.
+ * <p/>
+ * This is only used when <tt>useMessageIDasCorrelationID</tt> option is used, which means a
+ * provisional correlation id is first used, then after the message has been sent, the real
+ * correlation id is known. This allows us then to update the internal mapping to expect the
+ * real correlation id.
+ *
+ * @param correlationId the provisional correlation id
+ * @param newCorrelationId the real correlation id
+ * @param requestTimeout an optional timeout
+ */
+ void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout);
+
+ /**
+ * Process the reply
+ *
+ * @param holder containing needed data to process the reply and continue routing
+ */
+ void processReply(ReplyHolder holder);
+}
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.component.jms.JmsEndpoint;
+import org.apache.camel.component.jms.JmsMessage;
+import org.apache.camel.component.jms.JmsMessageHelper;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+
+/**
+ * Base class for {@link ReplyManager} implementations.
+ *
+ * @version $Revision$
+ */
+public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager {
+
+ protected final Log log = LogFactory.getLog(getClass());
+ protected ScheduledExecutorService executorService;
+ protected JmsEndpoint endpoint;
+ protected Destination replyTo;
+ protected AbstractMessageListenerContainer listenerContainer;
+ protected long replyToResolverTimeout = 5000;
+ protected CorrelationMap correlation;
+
+ public void setScheduledExecutorService(ScheduledExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public void setEndpoint(JmsEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public void setReplyTo(Destination replyTo) {
+ if (log.isTraceEnabled()) {
+ log.trace("ReplyTo destination: " + replyTo);
+ }
+ this.replyTo = replyTo;
+ }
+
+ public Destination getReplyTo() {
+ synchronized (this) {
+ try {
+ // wait for the reply to destination to be resolved
+ if (replyTo == null) {
+ wait(replyToResolverTimeout);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+ }
+ return replyTo;
+ }
+
+ public void onMessage(Message message) {
+ String correlationID = null;
+ try {
+ correlationID = message.getJMSCorrelationID();
+ } catch (JMSException e) {
+ // ignore
+ }
+ if (correlationID == null) {
+ log.warn("Ignoring message with no correlationID: " + message);
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received reply message with correlationID: " + correlationID + " -> " + message);
+ }
+
+ // handle the reply message
+ handleReplyMessage(correlationID, message);
+ }
+
+ public void processReply(ReplyHolder holder) {
+ if (holder != null && isRunAllowed()) {
+ Exchange exchange = holder.getExchange();
+ Message message = holder.getMessage();
+
+ boolean timeout = holder.isTimeout();
+ if (timeout) {
+ // no response, so lets set a timed out exception
+ exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
+ } else {
+ JmsMessage response = new JmsMessage(message, endpoint.getBinding());
+ Object body = response.getBody();
+
+ if (endpoint.isTransferException() && body instanceof Exception) {
+ if (log.isDebugEnabled()) {
+ log.debug("Reply received. Setting reply as an Exception: " + body);
+ }
+ // we got an exception back and endpoint was configured to transfer exception
+ // therefore set response as exception
+ exchange.setException((Exception) body);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Reply received. Setting reply as OUT message: " + body);
+ }
+ // regular response
+ exchange.setOut(response);
+ }
+
+ // restore correlation id in case the remote server messed with it
+ if (holder.getOriginalCorrelationId() != null) {
+ JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId());
+ exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId());
+ }
+ }
+
+ // notify callback
+ AsyncCallback callback = holder.getCallback();
+ callback.done(false);
+ }
+ }
+
+ protected abstract void handleReplyMessage(String correlationID, Message message);
+
+ protected abstract AbstractMessageListenerContainer createListenerContainer() throws Exception;
+
+ /**
+ * <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only
+ * testing using InOut over JMS. Its unlikely to happen in a real life situation with communication
+ * to a remote broker, which always will be slower to send back reply, before Camel had a chance
+ * to update it's internal correlation map.
+ */
+ protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, Message message) {
+ // race condition, when using messageID as correlationID then we store a provisional correlation id
+ // at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely
+ // event that the reply comes back really really fast, and the correlation map hasn't yet been updated
+ // from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again.
+ if (log.isWarnEnabled()) {
+ log.warn("Early reply received with correlationID [" + correlationID + "] -> " + message);
+ }
+
+ ReplyHandler answer = null;
+
+ // wait up till 5 seconds
+ boolean done = false;
+ int counter = 0;
+ while (!done && counter++ < 50) {
+ if (log.isTraceEnabled()) {
+ log.trace("Early reply not found handler at attempt " + counter + ". Waiting a bit longer.");
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ // try again
+ answer = correlation.get(correlationID);
+ done = answer != null;
+
+ if (answer != null) {
+ if (log.isTraceEnabled()) {
+ log.trace("Early reply with correlationID [" + correlationID + "] has been matched after "
+ + counter + " attempts and can be processed using handler: " + answer);
+ }
+ }
+ }
+
+ return answer;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(executorService, "executorService", this);
+ ObjectHelper.notNull(endpoint, "endpoint", this);
+
+ // purge for timeout every second
+ correlation = new CorrelationMap(executorService, 1000);
+ ServiceHelper.startService(correlation);
+
+ // create JMS listener and start it
+ listenerContainer = createListenerContainer();
+ listenerContainer.afterPropertiesSet();
+ listenerContainer.start();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(correlation);
+
+ if (listenerContainer != null) {
+ listenerContainer.stop();
+ listenerContainer.destroy();
+ listenerContainer = null;
+ }
+ }
+
+}
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import javax.jms.Message;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * {@link ReplyHandler} to handle processing replies when using temporary queues.
+ *
+ * @version $Revision$
+ */
+public class TemporaryQueueReplyHandler implements ReplyHandler {
+
+ // task queue to add the holder so we can process the reply
+ protected final ReplyManager replyManager;
+ protected final Exchange exchange;
+ protected final AsyncCallback callback;
+ // remember the original correlation id, in case the server returns back a reply with a messed up correlation id
+ protected final String originalCorrelationId;
+ protected final long timeout;
+
+ public TemporaryQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+ String originalCorrelationId, long timeout) {
+ this.replyManager = replyManager;
+ this.exchange = exchange;
+ this.originalCorrelationId = originalCorrelationId;
+ this.callback = callback;
+ this.timeout = timeout;
+ }
+
+ public void onReply(String correlationId, Message reply) {
+ // create holder object with the reply and add to task queue so we can process the reply and continue
+ // route the exchange using the async routing engine
+ ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, reply);
+ // process reply
+ replyManager.processReply(holder);
+ }
+
+ public void onTimeout(String correlationId) {
+ // create holder object without the reply which means a timeout occurred
+ ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, timeout);
+ // process timeout
+ replyManager.processReply(holder);
+ }
+}