You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2008/04/17 18:08:08 UTC
svn commit: r649165 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/util/
components/camel-jms/src/main/java/org/apache/camel/component/jms/
components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/
components/c...
Author: hadrian
Date: Thu Apr 17 09:08:02 2008
New Revision: 649165
URL: http://svn.apache.org/viewvc?rev=649165&view=rev
Log:
CAMEL-469. Patch applied with thanks!
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Thu Apr 17 09:08:02 2008
@@ -93,10 +93,16 @@
Object[] keys = null;
synchronized (map) {
Set keySet = map.keySet();
- keys = new String[keySet.size()];
+ keys = new Object[keySet.size()];
keySet.toArray(keys);
}
return keys;
+ }
+
+ public int size() {
+ synchronized (map) {
+ return map.size();
+ }
}
/**
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java Thu Apr 17 09:08:02 2008
@@ -39,6 +39,7 @@
*/
Object[] getKeys();
+ int size();
/**
* Adds the key value pair into the map such that some time after the given
* timeout the entry will be evicted
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Thu Apr 17 09:08:02 2008
@@ -88,7 +88,7 @@
// -------------------------------------------------------------------------
public JmsBinding getBinding() {
if (binding == null) {
- binding = new JmsBinding();
+ binding = new JmsBinding(endpoint);
}
return binding;
}
@@ -161,10 +161,14 @@
public Message createMessage(Session session) throws JMSException {
Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session);
- // lets preserve any correlation ID
- String correlationID = message.getJMSCorrelationID();
- if (correlationID != null) {
- reply.setJMSCorrelationID(correlationID);
+ if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
+ String messageID = exchange.getIn().getHeader("JMSMessageID", String.class);
+ reply.setJMSCorrelationID(messageID);
+ } else {
+ String correlationID = message.getJMSCorrelationID();
+ if (correlationID != null) {
+ reply.setJMSCorrelationID(correlationID);
+ }
}
if (LOG.isDebugEnabled()) {
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Thu Apr 17 09:08:02 2008
@@ -56,9 +56,16 @@
*/
public class JmsBinding {
private static final transient Log LOG = LogFactory.getLog(JmsBinding.class);
+ private JmsEndpoint endpoint;
private Set<String> ignoreJmsHeaders;
private XmlConverter xmlConverter = new XmlConverter();
+ public JmsBinding() {
+ }
+
+ public JmsBinding(JmsEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
/**
* Extracts the body from the JMS message
*
@@ -118,7 +125,8 @@
public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage, Session session)
throws JMSException {
Message answer = null;
- if (camelMessage instanceof JmsMessage) {
+ boolean alwaysCopy = (endpoint != null) ? endpoint.getConfiguration().isAlwaysCopyMessage() : false;
+ if (!alwaysCopy && camelMessage instanceof JmsMessage) {
JmsMessage jmsMessage = (JmsMessage)camelMessage;
answer = jmsMessage.getJmsMessage();
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Thu Apr 17 09:08:02 2008
@@ -241,6 +241,14 @@
getConfiguration().setMessageTimestampEnabled(messageTimestampEnabled);
}
+ public void setAlwaysCopyMessage(boolean alwaysCopyMessage) {
+ getConfiguration().setAlwaysCopyMessage(alwaysCopyMessage);
+ }
+
+ public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
+ getConfiguration().setUseMessageIDAsCorrelationID(useMessageIDAsCorrelationID);
+ }
+
public void setPriority(int priority) {
getConfiguration().setPriority(priority);
}
@@ -301,7 +309,7 @@
getConfiguration().setDestinationResolver(destinationResolver);
}
- public Requestor getRequestor() throws Exception {
+ public synchronized Requestor getRequestor() throws Exception {
if (requestor == null) {
requestor = new Requestor(getConfiguration(), getExecutorService());
requestor.start();
@@ -361,7 +369,7 @@
remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
}
- final String subject = convertPathToActualDestination(remaining);
+ final String subject = convertPathToActualDestination(remaining, parameters);
// lets make sure we copy the configuration as each endpoint can
// customize its own version
@@ -386,7 +394,7 @@
* A strategy method allowing the URI destination to be translated into the
* actual JMS destination name (say by looking up in JNDI or something)
*/
- protected String convertPathToActualDestination(String path) {
+ protected String convertPathToActualDestination(String path, Map parameters) {
return path;
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Thu Apr 17 09:08:02 2008
@@ -17,6 +17,7 @@
package org.apache.camel.component.jms;
import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -26,12 +27,18 @@
import javax.jms.TopicPublisher;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.jms.requestor.Requestor;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.PackageHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.JmsTemplate102;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer102;
@@ -40,14 +47,18 @@
import org.springframework.jms.listener.serversession.ServerSessionFactory;
import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer;
import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer102;
+import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.util.Assert;
+
/**
* @version $Revision$
*/
public class JmsConfiguration implements Cloneable {
+ private static final transient Log LOG = LogFactory.getLog(JmsConfiguration.class);
protected static final String TRANSACTED = "TRANSACTED";
protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
@@ -83,6 +94,7 @@
private boolean useVersion102;
private Boolean explicitQosEnabled;
private boolean deliveryPersistent = true;
+ private boolean replyToDeliveryPersistent = true;
private long timeToLive = -1;
private MessageConverter messageConverter;
private boolean messageIdEnabled = true;
@@ -97,6 +109,9 @@
private long requestMapPurgePollTimeMillis = 1000L;
private boolean disableReplyTo;
private boolean eagerLoadingOfProperties;
+ // Always make a JMS message copy when it's passed to Producer
+ private boolean alwaysCopyMessage = false;
+ private boolean useMessageIDAsCorrelationID = false;
public JmsConfiguration() {
}
@@ -129,69 +144,156 @@
}
return answer;
}
-
- public JmsOperations createInOnlyTemplate(boolean pubSubDomain, String destination) {
-
- if (jmsOperations != null) {
- return jmsOperations;
- }
-
- ConnectionFactory factory = getTemplateConnectionFactory();
-
- // I whish the spring templates had built in support for preserving the
- // message
- // qos when doing a send. :(
- JmsTemplate template = useVersion102 ? new JmsTemplate102(factory, pubSubDomain) {
- /**
- * Override so we can support preserving the Qos settings that have
- * been set on the message.
- */
- @Override
- protected void doSend(MessageProducer producer, Message message) throws JMSException {
- if (preserveMessageQos) {
- long ttl = message.getJMSExpiration();
- if (ttl != 0) {
- ttl = ttl - System.currentTimeMillis();
- // Message had expired.. so set the ttl as small as
- // possible
- if (ttl <= 0) {
- ttl = 1;
+
+ public static interface MessageSentCallback {
+ public void sent(Message message);
+ }
+
+ public static class CamelJmsTemplate extends JmsTemplate {
+ private JmsConfiguration config;
+
+ public CamelJmsTemplate(JmsConfiguration config, ConnectionFactory connectionFactory) {
+ super(connectionFactory);
+ this.config = config;
+ }
+
+ public void send(final String destinationName,
+ final MessageCreator messageCreator,
+ final MessageSentCallback callback) throws JmsException {
+ execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ Destination destination = resolveDestinationName(session, destinationName);
+ Assert.notNull(messageCreator, "MessageCreator must not be null");
+ MessageProducer producer = createProducer(session, destination);
+ Message message = null;
+ try {
+ message = messageCreator.createMessage(session);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending created message: " + message);
+ }
+ doSend(producer, message);
+ // Check commit - avoid commit call within a JTA transaction.
+ if (session.getTransacted() && isSessionLocallyTransacted(session)) {
+ // Transacted session created by this template -> commit.
+ JmsUtils.commitIfNecessary(session);
}
}
- if (isPubSubDomain()) {
- ((TopicPublisher)producer).publish(message, message.getJMSDeliveryMode(), message
- .getJMSPriority(), ttl);
- } else {
- ((QueueSender)producer).send(message, message.getJMSDeliveryMode(), message
- .getJMSPriority(), ttl);
+ finally {
+ JmsUtils.closeMessageProducer(producer);
+ }
+ if (message != null && callback != null) {
+ callback.sent(message);
+ }
+ return null;
+ }
+ }, false);
+ }
+
+ /**
+ * Override so we can support preserving the Qos settings that have
+ * been set on the message.
+ */
+ @Override
+ protected void doSend(MessageProducer producer, Message message) throws JMSException {
+ if (config.isPreserveMessageQos()) {
+ long ttl = message.getJMSExpiration();
+ if (ttl != 0) {
+ ttl = ttl - System.currentTimeMillis();
+ // Message had expired.. so set the ttl as small as
+ // possible
+ if (ttl <= 0) {
+ ttl = 1;
}
- } else {
- super.doSend(producer, message);
}
+ producer.send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
}
- } : new JmsTemplate(factory) {
- /**
- * Override so we can support preserving the Qos settings that have
- * been set on the message.
- */
- @Override
- protected void doSend(MessageProducer producer, Message message) throws JMSException {
- if (preserveMessageQos) {
- long ttl = message.getJMSExpiration();
- if (ttl != 0) {
- ttl = ttl - System.currentTimeMillis();
- // Message had expired.. so set the ttl as small as
- // possible
- if (ttl <= 0) {
- ttl = 1;
- }
+ else {
+ super.doSend(producer, message);
+ }
+ }
+ }
+
+ public static class CamelJmsTeemplate102 extends JmsTemplate102 {
+ private JmsConfiguration config;
+
+ public CamelJmsTeemplate102(JmsConfiguration config, ConnectionFactory connectionFactory, boolean pubSubDomain) {
+ super(connectionFactory, pubSubDomain);
+ this.config = config;
+ }
+
+ public void send(final String destinationName,
+ final MessageCreator messageCreator,
+ final MessageSentCallback callback) throws JmsException {
+ execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ Destination destination = resolveDestinationName(session, destinationName);
+ Assert.notNull(messageCreator, "MessageCreator must not be null");
+ MessageProducer producer = createProducer(session, destination);
+ Message message = null;
+ try {
+ message = messageCreator.createMessage(session);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending created message: " + message);
+ }
+ doSend(producer, message);
+ // Check commit - avoid commit call within a JTA transaction.
+ if (session.getTransacted() && isSessionLocallyTransacted(session)) {
+ // Transacted session created by this template -> commit.
+ JmsUtils.commitIfNecessary(session);
+ }
+ }
+ finally {
+ JmsUtils.closeMessageProducer(producer);
+ }
+ if (message != null && callback != null) {
+ callback.sent(message);
+ }
+ return null;
+ }
+ }, false);
+ }
+
+ /**
+ * Override so we can support preserving the Qos settings that have
+ * been set on the message.
+ */
+ @Override
+ protected void doSend(MessageProducer producer, Message message) throws JMSException {
+ if (config.isPreserveMessageQos()) {
+ long ttl = message.getJMSExpiration();
+ if (ttl != 0) {
+ ttl = ttl - System.currentTimeMillis();
+ // Message had expired.. so set the ttl as small as
+ // possible
+ if (ttl <= 0) {
+ ttl = 1;
}
- producer.send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
+ }
+ if (isPubSubDomain()) {
+ ((TopicPublisher) producer).publish(message, message.getJMSDeliveryMode(),
+ message.getJMSPriority(), ttl);
} else {
- super.doSend(producer, message);
+ ((QueueSender) producer).send(message, message.getJMSDeliveryMode(),
+ message.getJMSPriority(), ttl);
}
}
- };
+ else {
+ super.doSend(producer, message);
+ }
+ }
+ }
+
+ public JmsOperations createInOnlyTemplate(boolean pubSubDomain, String destination) {
+
+ if (jmsOperations != null) {
+ return jmsOperations;
+ }
+
+ ConnectionFactory factory = getTemplateConnectionFactory();
+
+ JmsTemplate template = useVersion102 ?
+ new CamelJmsTeemplate102(this, factory, pubSubDomain)
+ : new CamelJmsTemplate(this, factory);
template.setPubSubDomain(pubSubDomain);
if (destinationResolver != null) {
@@ -499,6 +601,14 @@
configuredQoS();
}
+ public boolean isReplyToDeliveryPersistent() {
+ return replyToDeliveryPersistent;
+ }
+
+ public void setReplyToDeliveryPersistent(boolean replyToDeliveryPersistent) {
+ this.replyToDeliveryPersistent = replyToDeliveryPersistent;
+ }
+
public long getTimeToLive() {
return timeToLive;
}
@@ -531,7 +641,7 @@
public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
this.messageTimestampEnabled = messageTimestampEnabled;
}
-
+
public int getPriority() {
return priority;
}
@@ -760,6 +870,15 @@
if (isEagerLoadingOfProperties()) {
listener.setEagerLoadingOfProperties(true);
}
+ // REVISIT: We really ought to change the model and let JmsProducer
+ // and JmsConsumer have their own JmsConfiguration instance
+ // This way producer's and consumer's QoS can differ and be
+ // independently configured
+ JmsOperations operations = listener.getTemplate();
+ if (operations instanceof JmsTemplate) {
+ JmsTemplate template = (JmsTemplate)operations;
+ template.setDeliveryPersistent(isReplyToDeliveryPersistent());
+ }
}
protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
// TODO we could allow a spring container to auto-inject these objects?
@@ -851,4 +970,20 @@
}
}
+
+ public boolean isAlwaysCopyMessage() {
+ return alwaysCopyMessage;
+ }
+
+ public void setAlwaysCopyMessage(boolean alwaysCopyMessage) {
+ this.alwaysCopyMessage = alwaysCopyMessage;
+ }
+
+ public boolean isUseMessageIDAsCorrelationID() {
+ return useMessageIDAsCorrelationID;
+ }
+
+ public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
+ this.useMessageIDAsCorrelationID = useMessageIDAsCorrelationID;
+ }
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Thu Apr 17 09:08:02 2008
@@ -30,12 +30,13 @@
*/
public class JmsConsumer extends DefaultConsumer<JmsExchange> {
private final AbstractMessageListenerContainer listenerContainer;
+ private EndpointMessageListener messageListener;
public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) {
super(endpoint, processor);
this.listenerContainer = listenerContainer;
- MessageListener messageListener = createMessageListener(endpoint, processor);
+ createMessageListener(endpoint, processor);
this.listenerContainer.setMessageListener(messageListener);
}
@@ -43,10 +44,13 @@
return listenerContainer;
}
- protected MessageListener createMessageListener(JmsEndpoint endpoint, Processor processor) {
- EndpointMessageListener messageListener = new EndpointMessageListener(endpoint, processor);
- messageListener.setBinding(endpoint.getBinding());
+ public EndpointMessageListener getEndpointMessageListener() {
return messageListener;
+ }
+
+ protected void createMessageListener(JmsEndpoint endpoint, Processor processor) {
+ messageListener = new EndpointMessageListener(endpoint, processor);
+ messageListener.setBinding(endpoint.getBinding());
}
@Override
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Thu Apr 17 09:08:02 2008
@@ -123,7 +123,7 @@
// -------------------------------------------------------------------------
public JmsBinding getBinding() {
if (binding == null) {
- binding = new JmsBinding();
+ binding = new JmsBinding(this);
}
return binding;
}
@@ -161,7 +161,7 @@
return false;
}
- public Requestor getRequestor() throws Exception {
+ public synchronized Requestor getRequestor() throws Exception {
if (requestor == null) {
requestor = component.getRequestor();
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Thu Apr 17 09:08:02 2008
@@ -28,13 +28,21 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
+import org.apache.camel.component.jms.JmsConfiguration.MessageSentCallback;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap;
+import org.apache.camel.component.jms.requestor.FailedToProcessResponse;
import org.apache.camel.component.jms.requestor.Requestor;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.Out;
import org.apache.camel.util.UuidGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.SessionCallback;
/**
* @version $Revision$
@@ -45,12 +53,28 @@
private JmsOperations inOnlyTemplate;
private JmsOperations inOutTemplate;
private UuidGenerator uuidGenerator;
+ private DeferredRequestReplyMap deferredRequestReplyMap;
public JmsProducer(JmsEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
+ public long getRequestTimeout() {
+ return endpoint.getRequestTimeout();
+ }
+
+ protected void doStart() throws Exception {
+ super.doStart();
+ deferredRequestReplyMap = endpoint.getRequestor().getDeferredRequestReplyMap(this);
+ }
+
+ protected void doStop() throws Exception {
+ endpoint.getRequestor().removeDeferredRequestReplyMap(this);
+ deferredRequestReplyMap = null;
+ super.doStop();
+ }
+
public void process(final Exchange exchange) {
final org.apache.camel.Message in = exchange.getIn();
@@ -58,7 +82,7 @@
// create a temporary queue and consumer for responses...
// note due to JMS transaction semantics we cannot use a single transaction
// for sending the request and receiving the response
- Requestor requestor;
+ final Requestor requestor;
try {
requestor = endpoint.getRequestor();
} catch (Exception e) {
@@ -67,36 +91,47 @@
final Destination replyTo = requestor.getReplyTo();
+ final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
String correlationId = in.getHeader("JMSCorrelationID", String.class);
- if (correlationId == null) {
- correlationId = getUuidGenerator().generateId();
- in.setHeader("JMSCorrelationID", correlationId);
+
+ if (correlationId == null && !msgIdAsCorrId) {
+ in.setHeader("JMSCorrelationID", getUuidGenerator().generateId());
}
+
+ final Out<FutureTask> futureHolder = new Out<FutureTask>();
+ final DeferredMessageSentCallback callback = (msgIdAsCorrId) ?
+ deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
- // lets register the future object before we try send just in case
- long requestTimeout = endpoint.getRequestTimeout();
- FutureTask future = requestor.getReceiveFuture(correlationId, requestTimeout);
-
- getInOutTemplate().send(endpoint.getDestination(), new MessageCreator() {
+ final CamelJmsTemplate template = (CamelJmsTemplate)getInOutTemplate();
+ template.send(endpoint.getDestination(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
message.setJMSReplyTo(replyTo);
+
+ FutureTask future = null;
+ future = (!msgIdAsCorrId) ?
+ requestor.getReceiveFuture(message.getJMSCorrelationID(),
+ endpoint.getRequestTimeout())
+ : requestor.getReceiveFuture(callback);
+ futureHolder.set(future);
+
if (LOG.isDebugEnabled()) {
LOG.debug(endpoint + " sending JMS message: " + message);
}
return message;
}
- });
-
+ }, callback);
+
// lets wait and return the response
+ long requestTimeout = endpoint.getRequestTimeout();
try {
Message message = null;
try {
if (requestTimeout < 0) {
- message = (Message)future.get();
+ message = (Message)futureHolder.get().get();
} else {
- message = (Message)future.get(requestTimeout, TimeUnit.MILLISECONDS);
+ message = (Message)futureHolder.get().get(requestTimeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
@@ -109,7 +144,12 @@
}
if (message != null) {
exchange.setOut(new JmsMessage(message, endpoint.getBinding()));
- } else {
+ if (correlationId != null) {
+ message.setJMSCorrelationID(correlationId);
+ exchange.getOut(false).setHeader("JMSCorrelationID", correlationId);
+ }
+ }
+ else {
// lets set a timed out exception
exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java Thu Apr 17 09:08:02 2008
@@ -16,8 +16,11 @@
*/
package org.apache.camel.component.jms.requestor;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
@@ -31,6 +34,9 @@
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.DefaultTimeoutMap;
import org.apache.camel.util.TimeoutMap;
+import org.apache.camel.util.UuidGenerator;
+import org.apache.camel.component.jms.JmsProducer;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.TaskExecutor;
@@ -45,26 +51,70 @@
public class Requestor extends ServiceSupport implements MessageListener {
private static final transient Log LOG = LogFactory.getLog(Requestor.class);
private final JmsConfiguration configuration;
+ private ScheduledExecutorService executorService;
private AbstractMessageListenerContainer listenerContainer;
private TimeoutMap requestMap;
+ private Map<JmsProducer, DeferredRequestReplyMap> producerDeferredRequestReplyMap;
+ private TimeoutMap deferredRequestMap;
+ private TimeoutMap deferredReplyMap;
private Destination replyTo;
-
+ private long maxRequestTimeout = -1;
+ private static UuidGenerator uuidGenerator;
+
public Requestor(JmsConfiguration configuration, ScheduledExecutorService executorService) {
this.configuration = configuration;
+ this.executorService = executorService;
requestMap = new DefaultTimeoutMap(executorService, configuration.getRequestMapPurgePollTimeMillis());
+ producerDeferredRequestReplyMap = new HashMap<JmsProducer, DeferredRequestReplyMap>();
+ deferredRequestMap = new DefaultTimeoutMap(executorService, configuration.getRequestMapPurgePollTimeMillis());
+ deferredReplyMap = new DefaultTimeoutMap(executorService, configuration.getRequestMapPurgePollTimeMillis());
+ }
+
+ public synchronized DeferredRequestReplyMap getDeferredRequestReplyMap(JmsProducer producer) {
+ DeferredRequestReplyMap map = producerDeferredRequestReplyMap.get(producer);
+ if (map == null) {
+ map = new DeferredRequestReplyMap(this, producer, deferredRequestMap, deferredReplyMap);
+ producerDeferredRequestReplyMap.put(producer, map);
+ }
+ if (maxRequestTimeout == -1) {
+ maxRequestTimeout = producer.getRequestTimeout();
+ } else if (maxRequestTimeout < producer.getRequestTimeout()) {
+ maxRequestTimeout = producer.getRequestTimeout();
+ }
+ return map;
+ }
+
+ public synchronized void removeDeferredRequestReplyMap(JmsProducer producer) {
+ producerDeferredRequestReplyMap.remove(producer);
+ if (maxRequestTimeout == producer.getRequestTimeout()) {
+ long max = -1;
+ for (Map.Entry<JmsProducer, DeferredRequestReplyMap> entry : producerDeferredRequestReplyMap.entrySet()) {
+ if (max < entry.getKey().getRequestTimeout()) {
+ max = entry.getKey().getRequestTimeout();
+ }
+ }
+ maxRequestTimeout = max;
+ }
+ }
+
+ public synchronized long getMaxRequestTimeout() {
+ return maxRequestTimeout;
+ }
+
+ public TimeoutMap getRequestMap() {
+ return requestMap;
+ }
+
+ public TimeoutMap getDeferredRequestMap() {
+ return deferredRequestMap;
+ }
+
+ public TimeoutMap getDeferredReplyMap() {
+ return deferredReplyMap;
}
public FutureTask getReceiveFuture(String correlationID, long requestTimeout) {
FutureTask future = null;
-/*
- // Deal with async handlers...
-
- Object currentHandler = requestMap.get(correlationID);
- if (currentHandler instanceof AsyncReplyHandler) {
- AsyncReplyHandler handler = (AsyncReplyHandler) currentHandler;
- future = handler.newResultHandler();
- }
-*/
if (future == null) {
FutureHandler futureHandler = new FutureHandler();
@@ -74,6 +124,13 @@
return future;
}
+ public FutureTask getReceiveFuture(DeferredMessageSentCallback callback) {
+ FutureTask future = new FutureHandler();
+ DeferredRequestReplyMap map = callback.getDeferredRequestReplyMap();
+ map.put(callback, future);
+ return future;
+ }
+
public void onMessage(Message message) {
try {
String correlationID = message.getJMSCorrelationID();
@@ -81,23 +138,24 @@
LOG.warn("Ignoring message with no correlationID! " + message);
return;
}
-
// lets notify the monitor for this response
Object handler = requestMap.get(correlationID);
- if (handler == null) {
- LOG.warn("Response received for unknown correlationID: " + correlationID + " request: "
- + message);
- } else if (handler instanceof ReplyHandler) {
- ReplyHandler replyHandler = (ReplyHandler)handler;
+ if (handler != null && handler instanceof ReplyHandler) {
+ ReplyHandler replyHandler = (ReplyHandler) handler;
boolean complete = replyHandler.handle(message);
if (complete) {
requestMap.remove(correlationID);
}
+ } else {
+ DeferredRequestReplyMap.processDeferredRequests(
+ this, deferredRequestMap, deferredReplyMap,
+ correlationID, getMaxRequestTimeout(), message);
}
} catch (JMSException e) {
throw new FailedToProcessResponse(message, e);
}
}
+
public AbstractMessageListenerContainer getListenerContainer() {
if (listenerContainer == null) {
@@ -166,5 +224,12 @@
answer.setExceptionListener(exceptionListener);
}
return answer;
+ }
+
+ public static synchronized UuidGenerator getUuidGenerator() {
+ if (uuidGenerator == null) {
+ uuidGenerator = new UuidGenerator();
+ }
+ return uuidGenerator;
}
}
Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java Thu Apr 17 09:08:02 2008
@@ -18,12 +18,15 @@
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
@@ -83,6 +86,20 @@
public void testCacheConsumerEnabledForTopic() throws Exception {
JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:topic:Foo.Bar");
assertCacheLevel(endpoint, DefaultMessageListenerContainer.CACHE_CONSUMER);
+ }
+
+ public void testReplyToPesistentDelivery() throws Exception {
+ JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo");
+ endpoint.getConfiguration().setDeliveryPersistent(true);
+ endpoint.getConfiguration().setReplyToDeliveryPersistent(false);
+ JmsProducer producer = endpoint.createProducer();
+ JmsConsumer consumer = endpoint.createConsumer(dummyProcessor);
+ JmsOperations operations = consumer.getEndpointMessageListener().getTemplate();
+ assertTrue(operations instanceof JmsTemplate);
+ JmsTemplate template = (JmsTemplate)operations;
+ assertTrue("Wrong delivery mode on reply template; expected "
+ + " DeliveryMode.NON_PERSISTENT but was DeliveryMode.PERSISTENT",
+ template.getDeliveryMode() == DeliveryMode.NON_PERSISTENT);
}
protected void assertCacheLevel(JmsEndpoint endpoint, int expected) throws Exception {