You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/08/26 16:33:17 UTC
svn commit: r689094 - in
/servicemix/components/bindings/servicemix-jms/trunk/src:
main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
Author: gnodet
Date: Tue Aug 26 07:33:16 2008
New Revision: 689094
URL: http://svn.apache.org/viewvc?rev=689094&view=rev
Log:
SM-1520: make the jms provider endpoint asynchronous
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=689094&r1=689093&r2=689094&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Tue Aug 26 07:33:16 2008
@@ -19,13 +19,19 @@
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.RobustInOnly;
+import javax.jbi.messaging.InOut;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
+import javax.jms.MessageListener;
import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.jms.JmsEndpointType;
import org.apache.servicemix.store.Store;
import org.apache.servicemix.store.StoreFactory;
@@ -36,11 +42,15 @@
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.listener.AbstractPollingMessageListenerContainer;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer102;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
/**
- *
+ * A JMS provider endpoint
+ *
* @author gnodet
* @org.apache.xbean.XBean element="provider"
* @since 3.2
@@ -52,6 +62,7 @@
private JmsProviderMarshaler marshaler = new DefaultProviderMarshaler();
private DestinationChooser destinationChooser = new SimpleDestinationChooser();
+ private DestinationChooser replyDestinationChooser = new SimpleDestinationChooser();
private JmsTemplate template;
private boolean jms102;
@@ -63,7 +74,7 @@
private boolean messageIdEnabled = true;
private boolean messageTimestampEnabled = true;
private boolean pubSubNoLocal;
- private long receiveTimeout = AbstractPollingMessageListenerContainer.DEFAULT_RECEIVE_TIMEOUT;
+ private long receiveTimeout = JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT;
private boolean explicitQosEnabled;
private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
private int priority = Message.DEFAULT_PRIORITY;
@@ -71,11 +82,12 @@
private Destination replyDestination;
private String replyDestinationName;
-
- private boolean stateless;
+
private StoreFactory storeFactory;
private Store store;
-
+
+ private AbstractMessageListenerContainer listenerContainer;
+
/**
* @return the destination
*/
@@ -164,6 +176,19 @@
}
/**
+ * @return the destinationChooser
+ */
+ public DestinationChooser getReplyDestinationChooser() {
+ return replyDestinationChooser;
+ }
+
+ /**
+ * @param replyDestinationChooser the replyDestinationChooser to set
+ */
+ public void setReplyDestinationChooser(DestinationChooser replyDestinationChooser) {
+ this.replyDestinationChooser = replyDestinationChooser;
+ }
+ /**
* @return the destinationResolver
*/
public DestinationResolver getDestinationResolver() {
@@ -306,18 +331,16 @@
this.timeToLive = timeToLive;
}
- public boolean isStateless() {
- return stateless;
- }
-
- public void setStateless(boolean stateless) {
- this.stateless = stateless;
- }
-
public StoreFactory getStoreFactory() {
return storeFactory;
}
+ /**
+ * Sets the store factory used to create the store.
+ * If none is set, a {@link MemoryStoreFactory} will be created and used instead.
+ *
+ * @param storeFactory
+ */
public void setStoreFactory(StoreFactory storeFactory) {
this.storeFactory = storeFactory;
}
@@ -326,6 +349,12 @@
return store;
}
+ /**
+ * Sets the store used to store JBI exchanges that are waiting for a response
+ * JMS message. The store will be automatically created if not set.
+ *
+ * @param store
+ */
public void setStore(Store store) {
this.store = store;
}
@@ -334,6 +363,14 @@
return replyDestination;
}
+ /**
+ * Sets the reply destination.
+ * This JMS destination will be used as the default destination for the response
+ * messages when using an InOut JBI exchange. It will be used if the
+ * <code>replyDestinationChooser</code> does not return any value.
+ *
+ * @param replyDestination
+ */
public void setReplyDestination(Destination replyDestination) {
this.replyDestination = replyDestination;
}
@@ -342,42 +379,128 @@
return replyDestinationName;
}
+ /**
+ * Sets the name of the reply destination.
+ * This property will be used to create the <code>replyDestination</code>
+ * using the <code>destinationResolver</code> when the endpoint starts if
+ * the <code>replyDestination</code> has not been set.
+ *
+ * @param replyDestinationName
+ */
public void setReplyDestinationName(String replyDestinationName) {
this.replyDestinationName = replyDestinationName;
}
- protected void processInOnly(final MessageExchange exchange, final NormalizedMessage in) throws Exception {
- MessageCreator creator = new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- try {
- Message message = marshaler.createMessage(exchange, in, session);
- if (logger.isTraceEnabled()) {
- logger.trace("Sending message to: " + template.getDefaultDestinationName() + " message: " + message);
+ /**
+ * Process the incoming JBI exchange
+ * @param exchange
+ * @throws Exception
+ */
+ public void process(MessageExchange exchange) throws Exception {
+ // The component acts as a provider, this means that another component has requested our service
+ // As this exchange is active, this is either an in or a fault (out are sent by this component)
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ // Exchange is finished
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ // Exchange has been aborted with an exception
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ return;
+ // Exchange is active
+ } else {
+ NormalizedMessage in;
+ // Fault message
+ if (exchange.getFault() != null) {
+ done(exchange);
+ // In message
+ } else if ((in = exchange.getMessage("in")) != null) {
+ if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
+ processInOnly(exchange, in);
+ done(exchange);
}
- return message;
- } catch (Exception e) {
- JMSException jmsEx = new JMSException("Failed to create JMS Message: " + e);
- jmsEx.setLinkedException(e);
- jmsEx.initCause(e);
- throw jmsEx;
+ else {
+ processInOut(exchange, in);
+ }
+ // This is not compliant with the default MEPs
+ } else {
+ throw new IllegalStateException("Provider exchange is ACTIVE, but no in or fault is provided");
}
}
- };
- Object dest = destinationChooser.chooseDestination(exchange, in);
- if (dest instanceof Destination) {
- template.send((Destination) dest, creator);
- } else if (dest instanceof String) {
- template.send((String) dest, creator);
+ // Unsupported role: this should never happen has we never create exchanges
} else {
- template.send(creator);
+ throw new IllegalStateException("Unsupported role: " + exchange.getRole());
}
}
- protected void processInOut(final MessageExchange exchange, final NormalizedMessage in, final NormalizedMessage out) throws Exception {
+ /**
+ * Process an InOnly or RobustInOnly exchange.
+ * This method uses the JMS template to create a session and call the
+ * {@link #processInOnlyInSession(javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage, javax.jms.Session)}
+ * method.
+ *
+ * @param exchange
+ * @param in
+ * @throws Exception
+ */
+ protected void processInOnly(final MessageExchange exchange,
+ final NormalizedMessage in) throws Exception {
+ SessionCallback callback = new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ try {
+ processInOnlyInSession(exchange, in, session);
+ return null;
+ } catch (JMSException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new UncategorizedJmsException(e);
+ }
+ }
+ };
+ template.execute(callback, true);
+ }
+
+ /**
+ * Process an InOnly or RobustInOnly exchange inside a JMS session.
+ * This method delegates the JMS message creation to the marshaler and uses
+ * the JMS template to send it.
+ *
+ * @param exchange
+ * @param in
+ * @param session
+ * @throws Exception
+ */
+ protected void processInOnlyInSession(final MessageExchange exchange,
+ final NormalizedMessage in,
+ final Session session) throws Exception {
+ // Create destination
+ final Destination dest = getDestination(exchange, in, session);
+ // Create message and send it
+ final Message message = marshaler.createMessage(exchange, in, session);
+ template.send(dest, new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ return message;
+ }
+ });
+ }
+
+ /**
+ * Process an InOut or InOptionalOut exchange.
+ * This method uses the JMS template to create a session and call the
+ * {@link #processInOutInSession(javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage, javax.jms.Session)}
+ * method.
+ *
+ * @param exchange
+ * @param in
+ * @throws Exception
+ */
+ protected void processInOut(final MessageExchange exchange,
+ final NormalizedMessage in) throws Exception {
SessionCallback callback = new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
try {
- processInOutInSession(exchange, in, out, session);
+ processInOutInSession(exchange, in, session);
return null;
} catch (JMSException e) {
throw e;
@@ -391,71 +514,170 @@
template.execute(callback, true);
}
- protected void processInOutInSession(final MessageExchange exchange,
- final NormalizedMessage in,
- final NormalizedMessage out,
+ /**
+ * Process an InOnly or RobustInOnly exchange inside a JMS session.
+ * This method delegates the JMS message creation to the marshaler and uses
+ * the JMS template to send it. If the JMS destination that was used to send
+ * the message is not the default one, it synchronously wait for the message
+ * to come back using a JMS selector. Else, it just returns and the response
+ * message will come back from the listener container.
+ *
+ * @param exchange
+ * @param in
+ * @param session
+ * @throws Exception
+ */
+ protected void processInOutInSession(final MessageExchange exchange,
+ final NormalizedMessage in,
final Session session) throws Exception {
// Create destinations
final Destination dest = getDestination(exchange, in, session);
- final Destination replyDest = getReplyDestination(exchange, out, session);
+ final Destination replyDest = getReplyDestination(exchange, in, session);
// Create message and send it
final Message sendJmsMsg = marshaler.createMessage(exchange, in, session);
- sendJmsMsg.setJMSReplyTo(replyDest);
+ sendJmsMsg.setJMSReplyTo(replyDest);
+ // handle correlation ID
+ String correlationId = sendJmsMsg.getJMSMessageID() != null ? sendJmsMsg.getJMSMessageID() : exchange.getExchangeId();
+ sendJmsMsg.setJMSCorrelationID(correlationId);
+
+ boolean asynchronous = replyDest.equals(replyDestination);
+
+ if (asynchronous) {
+ store.store(correlationId, exchange);
+ }
+
+ try {
+ template.send(dest, new MessageCreator() {
+ public Message createMessage(Session session)
+ throws JMSException {
+ return sendJmsMsg;
+ }
+ });
+ } catch (Exception e) {
+ if (asynchronous) {
+ store.load(exchange.getExchangeId());
+ }
+ throw e;
+ }
- template.send(dest, new MessageCreator() {
- public Message createMessage(Session session)
- throws JMSException {
- return sendJmsMsg;
+ if (!asynchronous) {
+ // Create selector
+ String jmsId = sendJmsMsg.getJMSMessageID();
+ String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
+ Message receiveJmsMsg;
+ synchronized (template) {
+ // Receiving JMS Message, Creating and Returning NormalizedMessage out
+ receiveJmsMsg = template.receiveSelected(replyDest, selector);
+ if (receiveJmsMsg == null) {
+ throw new IllegalStateException("Unable to receive response");
+ }
}
- });
-
- // Create selector
- String jmsId = sendJmsMsg.getJMSMessageID();
- String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
- Message receiveJmsMsg;
- synchronized (template) {
- // Receiving JMS Message, Creating and Returning NormalizedMessage out
- receiveJmsMsg = template.receiveSelected(replyDest, selector);
- if (receiveJmsMsg == null) {
- throw new IllegalStateException("Unable to receive response");
+ NormalizedMessage out = exchange.getMessage("out");
+ if (out == null) {
+ out = exchange.createMessage();
+ exchange.setMessage(out, "out");
+ }
+ marshaler.populateMessage(receiveJmsMsg, exchange, out);
+ boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+ if (txSync) {
+ sendSync(exchange);
+ } else {
+ send(exchange);
}
- }
- marshaler.populateMessage(receiveJmsMsg, exchange, out);
+ }
}
- protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
- Object dest = null;
- // Let the destinationChooser a chance to choose the destination
- if (destinationChooser != null) {
- dest = destinationChooser.chooseDestination(exchange, message);
+ /**
+ * Process a JMS response message.
+ * This method delegates to the marshaler for the JBI out message creation
+ * and sends it in to the NMR.
+ *
+ * @param message
+ */
+ protected void onMessage(Message message) {
+ MessageExchange exchange = null;
+ try {
+ exchange = (InOut) store.load(message.getJMSCorrelationID());
+ if (exchange == null) {
+ throw new IllegalStateException("Could not find exchange " + message.getJMSCorrelationID());
+ }
+ } catch (Exception e) {
+ logger.error("Unable to load exchange related to incoming JMS message " + message, e);
}
- // Default to destinationName properties
- if (dest == null) {
- dest = destinationName;
+ try {
+ NormalizedMessage out = exchange.getMessage("out");
+ if (out == null) {
+ out = exchange.createMessage();
+ exchange.setMessage(out, "out");
+ }
+ marshaler.populateMessage(message, exchange, out);
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Error while populating JBI exchange " + exchange, e);
+ }
+ exchange.setError(e);
}
- // Resolve destination if needed
- if (dest instanceof Destination) {
- return (Destination) dest;
- } else if (dest instanceof String) {
- return destinationResolver.resolveDestinationName(session,
- (String) dest,
- isPubSubDomain());
+ try {
+ boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+ if (txSync) {
+ sendSync(exchange);
+ } else {
+ send(exchange);
+ }
+ } catch (Exception e) {
+ logger.error("Unable to send JBI exchange " + exchange, e);
}
- throw new IllegalStateException("Unable to choose destination for exchange " + exchange);
}
+ /**
+ * Retrieve the destination where the JMS message should be sent to.
+ *
+ * @param exchange
+ * @param message
+ * @param session
+ * @return
+ * @throws JMSException
+ */
+ protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
+ return chooseDestination(exchange, message, session, destinationChooser, destination);
+ }
+
+ /**
+ * Choose the JMS destination for the reply message
+ *
+ * @param exchange
+ * @param message
+ * @param session
+ * @return
+ * @throws JMSException
+ */
protected Destination getReplyDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
+ return chooseDestination(exchange, message, session, replyDestinationChooser, replyDestination);
+ }
+
+ /**
+ * Choose a JMS destination given the chooser, a default destination and name
+ * @param exchange
+ * @param message
+ * @param session
+ * @param chooser
+ * @param defaultDestination
+ * @return
+ * @throws JMSException
+ */
+ protected Destination chooseDestination(MessageExchange exchange,
+ Object message,
+ Session session,
+ DestinationChooser chooser,
+ Destination defaultDestination) throws JMSException {
Object dest = null;
- // Let the destinationChooser a chance to choose the destination
- if (destinationChooser != null) {
- dest = destinationChooser.chooseDestination(exchange, message);
+ // Let the replyDestinationChooser a chance to choose the destination
+ if (chooser != null) {
+ dest = chooser.chooseDestination(exchange, message);
}
- // Default to replyDestination / replyDestinationName properties
+ // Default to defaultDestination properties
if (dest == null) {
- dest = replyDestination;
- }
- if (dest == null) {
- dest = replyDestinationName;
+ dest = defaultDestination;
}
// Resolve destination if needed
if (dest instanceof Destination) {
@@ -465,21 +687,55 @@
(String) dest,
isPubSubDomain());
}
- throw new IllegalStateException("Unable to choose replyDestination for exchange " + exchange);
+ throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
}
-
+
+ /**
+ * Start this endpoint.
+ *
+ * @throws Exception
+ */
public synchronized void start() throws Exception {
- template = createTemplate();
- if (store == null && !stateless) {
+ if (store == null) {
if (storeFactory == null) {
storeFactory = new MemoryStoreFactory();
}
store = storeFactory.open(getService().toString() + getEndpoint());
}
+ template = createTemplate();
+ // Obtain the default destination
+ if (destination == null && destinationName != null) {
+ destination = (Destination) template.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ return destinationResolver.resolveDestinationName(session, destinationName, isPubSubDomain());
+ }
+ });
+ }
+ // Obtain the default reply destination
+ if (replyDestination == null && replyDestinationName != null) {
+ replyDestination = (Destination) template.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ return destinationResolver.resolveDestinationName(session, replyDestinationName, isPubSubDomain());
+ }
+ });
+ }
+ // create the listener container
+ if (replyDestination != null) {
+ listenerContainer = createListenerContainer();
+ listenerContainer.start();
+ }
super.start();
}
-
+
+ /**
+ * Stops this endpoint.
+ *
+ * @throws Exception
+ */
public synchronized void stop() throws Exception {
+ if (listenerContainer != null) {
+ listenerContainer.stop();
+ }
if (store != null) {
if (storeFactory != null) {
storeFactory.close(store);
@@ -488,9 +744,13 @@
}
super.stop();
}
-
+
+ /**
+ * Validate this endpoint.
+ *
+ * @throws DeploymentException
+ */
public void validate() throws DeploymentException {
- // TODO: check service, endpoint
super.validate();
if (getService() == null) {
throw new DeploymentException("service must be set");
@@ -503,6 +763,11 @@
}
}
+ /**
+ * Create the JMS template to be used to send the JMS messages.
+ *
+ * @return
+ */
protected JmsTemplate createTemplate() {
JmsTemplate tplt;
if (isJms102()) {
@@ -528,6 +793,32 @@
tplt.setPubSubNoLocal(isPubSubNoLocal());
tplt.setTimeToLive(getTimeToLive());
tplt.setReceiveTimeout(getReceiveTimeout());
+ tplt.afterPropertiesSet();
return tplt;
}
+
+ /**
+ * Create the message listener container to receive response messages.
+ *
+ * @return
+ */
+ protected AbstractMessageListenerContainer createListenerContainer() {
+ DefaultMessageListenerContainer cont;
+ if (isJms102()) {
+ cont = new DefaultMessageListenerContainer102();
+ } else {
+ cont = new DefaultMessageListenerContainer();
+ }
+ cont.setConnectionFactory(getConnectionFactory());
+ cont.setDestination(getReplyDestination());
+ cont.setPubSubDomain(isPubSubDomain());
+ cont.setPubSubNoLocal(isPubSubNoLocal());
+ cont.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ JmsProviderEndpoint.this.onMessage(message);
+ }
+ });
+ cont.afterPropertiesSet();
+ return cont;
+ }
}
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java?rev=689094&r1=689093&r2=689094&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java Tue Aug 26 07:33:16 2008
@@ -107,7 +107,7 @@
jmsTemplate.send("reply", new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage rep = session.createTextMessage(baos.toString());
- rep.setJMSCorrelationID(msg.getJMSMessageID());
+ rep.setJMSCorrelationID(msg.getJMSCorrelationID() != null ? msg.getJMSCorrelationID() : msg.getJMSMessageID());
return rep;
}
});