You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/08/26 16:33:17 UTC

svn commit: r689094 - in /servicemix/components/bindings/servicemix-jms/trunk/src: main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java

Author: gnodet
Date: Tue Aug 26 07:33:16 2008
New Revision: 689094

URL: http://svn.apache.org/viewvc?rev=689094&view=rev
Log:
SM-1520: make the jms provider endpoint asynchronous

Modified:
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=689094&r1=689093&r2=689094&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Tue Aug 26 07:33:16 2008
@@ -19,13 +19,19 @@
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.RobustInOnly;
+import javax.jbi.messaging.InOut;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
+import javax.jms.MessageListener;
 
 import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.common.JbiConstants;
 import org.apache.servicemix.jms.JmsEndpointType;
 import org.apache.servicemix.store.Store;
 import org.apache.servicemix.store.StoreFactory;
@@ -36,11 +42,15 @@
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.core.SessionCallback;
 import org.springframework.jms.listener.AbstractPollingMessageListenerContainer;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer102;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.jms.support.destination.DynamicDestinationResolver;
 
 /**
- * 
+ * A JMS provider endpoint
+ *
  * @author gnodet
  * @org.apache.xbean.XBean element="provider"
  * @since 3.2
@@ -52,6 +62,7 @@
 
     private JmsProviderMarshaler marshaler = new DefaultProviderMarshaler();
     private DestinationChooser destinationChooser = new SimpleDestinationChooser();
+    private DestinationChooser replyDestinationChooser = new SimpleDestinationChooser();
     private JmsTemplate template;
 
     private boolean jms102;
@@ -63,7 +74,7 @@
     private boolean messageIdEnabled = true;
     private boolean messageTimestampEnabled = true;
     private boolean pubSubNoLocal;
-    private long receiveTimeout = AbstractPollingMessageListenerContainer.DEFAULT_RECEIVE_TIMEOUT;
+    private long receiveTimeout = JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT;
     private boolean explicitQosEnabled;
     private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
     private int priority = Message.DEFAULT_PRIORITY;
@@ -71,11 +82,12 @@
 
     private Destination replyDestination;
     private String replyDestinationName;
-    
-    private boolean stateless;
+
     private StoreFactory storeFactory;
     private Store store;
-    
+
+    private AbstractMessageListenerContainer listenerContainer;
+
     /**
      * @return the destination
      */
@@ -164,6 +176,19 @@
     }
 
     /**
+     * @return the destinationChooser
+     */
+    public DestinationChooser getReplyDestinationChooser() {
+        return replyDestinationChooser;
+    }
+
+    /**
+     * @param replyDestinationChooser the replyDestinationChooser to set
+     */
+    public void setReplyDestinationChooser(DestinationChooser replyDestinationChooser) {
+        this.replyDestinationChooser = replyDestinationChooser;
+    }
+    /**
      * @return the destinationResolver
      */
     public DestinationResolver getDestinationResolver() {
@@ -306,18 +331,16 @@
         this.timeToLive = timeToLive;
     }
 
-    public boolean isStateless() {
-        return stateless;
-    }
-
-    public void setStateless(boolean stateless) {
-        this.stateless = stateless;
-    }
-
     public StoreFactory getStoreFactory() {
         return storeFactory;
     }
 
+    /**
+     * Sets the store factory used to create the store.
+     * If none is set, a {@link MemoryStoreFactory} will be created and used instead.
+     *
+     * @param storeFactory
+     */
     public void setStoreFactory(StoreFactory storeFactory) {
         this.storeFactory = storeFactory;
     }
@@ -326,6 +349,12 @@
         return store;
     }
 
+    /**
+     * Sets the store used to store JBI exchanges that are waiting for a response
+     * JMS message.  The store will be automatically created if not set.
+     *
+     * @param store
+     */
     public void setStore(Store store) {
         this.store = store;
     }
@@ -334,6 +363,14 @@
         return replyDestination;
     }
 
+    /**
+     * Sets the reply destination.
+     * This JMS destination will be used as the default destination for the response
+     * messages when using an InOut JBI exchange.  It will be used if the
+     * <code>replyDestinationChooser</code> does not return any value.
+     *
+     * @param replyDestination
+     */
     public void setReplyDestination(Destination replyDestination) {
         this.replyDestination = replyDestination;
     }
@@ -342,42 +379,128 @@
         return replyDestinationName;
     }
 
+    /**
+     * Sets the name of the reply destination.
+     * This property will be used to create the <code>replyDestination</code>
+     * using the <code>destinationResolver</code> when the endpoint starts if
+     * the <code>replyDestination</code> has not been set.
+     *
+     * @param replyDestinationName
+     */
     public void setReplyDestinationName(String replyDestinationName) {
         this.replyDestinationName = replyDestinationName;
     }
 
-    protected void processInOnly(final MessageExchange exchange, final NormalizedMessage in) throws Exception {
-        MessageCreator creator = new MessageCreator() {
-            public Message createMessage(Session session) throws JMSException {
-                try {
-                    Message message = marshaler.createMessage(exchange, in, session);
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Sending message to: " + template.getDefaultDestinationName() + " message: " + message);
+    /**
+     * Process the incoming JBI exchange
+     * @param exchange
+     * @throws Exception
+     */
+    public void process(MessageExchange exchange) throws Exception {
+        // The component acts as a provider, this means that another component has requested our service
+        // As this exchange is active, this is either an in or a fault (out are sent by this component)
+        if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+            // Exchange is finished
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                return;
+            // Exchange has been aborted with an exception
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                return;
+            // Exchange is active
+            } else {
+                NormalizedMessage in;
+                // Fault message
+                if (exchange.getFault() != null) {
+                    done(exchange);
+                // In message
+                } else if ((in = exchange.getMessage("in")) != null) {
+                    if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
+                        processInOnly(exchange, in);
+                        done(exchange);
                     }
-                    return message;
-                } catch (Exception e) {
-                    JMSException jmsEx =  new JMSException("Failed to create JMS Message: " + e);
-                    jmsEx.setLinkedException(e);
-                    jmsEx.initCause(e);
-                    throw jmsEx;
+                    else {
+                        processInOut(exchange, in);
+                    }
+                // This is not compliant with the default MEPs
+                } else {
+                    throw new IllegalStateException("Provider exchange is ACTIVE, but no in or fault is provided");
                 }
             }
-        };
-        Object dest = destinationChooser.chooseDestination(exchange, in);
-        if (dest instanceof Destination) {
-            template.send((Destination) dest, creator);
-        } else if (dest instanceof String) {
-            template.send((String) dest, creator);
+        // Unsupported role: this should never happen has we never create exchanges
         } else {
-            template.send(creator);
+            throw new IllegalStateException("Unsupported role: " + exchange.getRole());
         }
     }
 
-    protected void processInOut(final MessageExchange exchange, final NormalizedMessage in, final NormalizedMessage out) throws Exception {
+    /**
+     * Process an InOnly or RobustInOnly exchange.
+     * This method uses the JMS template to create a session and call the
+     * {@link #processInOnlyInSession(javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage, javax.jms.Session)}
+     * method.
+     *
+     * @param exchange
+     * @param in
+     * @throws Exception
+     */
+    protected void processInOnly(final MessageExchange exchange,
+                                 final NormalizedMessage in) throws Exception {
+        SessionCallback callback = new SessionCallback() {
+          public Object doInJms(Session session) throws JMSException {
+              try {
+                  processInOnlyInSession(exchange, in, session);
+                  return null;
+              } catch (JMSException e) {
+                  throw e;
+              } catch (RuntimeException e) {
+                  throw e;
+              } catch (Exception e) {
+                  throw new UncategorizedJmsException(e);
+              }
+          }
+        };
+        template.execute(callback, true);
+    }
+
+    /**
+     * Process an InOnly or RobustInOnly exchange inside a JMS session.
+     * This method delegates the JMS message creation to the marshaler and uses
+     * the JMS template to send it.
+     *
+     * @param exchange
+     * @param in
+     * @param session
+     * @throws Exception
+     */
+    protected void processInOnlyInSession(final MessageExchange exchange,
+                                          final NormalizedMessage in,
+                                          final Session session) throws Exception {
+        // Create destination
+        final Destination dest = getDestination(exchange, in, session);
+        // Create message and send it
+        final Message message = marshaler.createMessage(exchange, in, session);
+        template.send(dest, new MessageCreator() {
+            public Message createMessage(Session session) throws JMSException {
+                return message;
+            }
+        });
+    }
+
+    /**
+     * Process an InOut or InOptionalOut exchange.
+     * This method uses the JMS template to create a session and call the
+     * {@link #processInOutInSession(javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage, javax.jms.Session)}
+     * method.
+     *
+     * @param exchange
+     * @param in
+     * @throws Exception
+     */
+    protected void processInOut(final MessageExchange exchange,
+                                final NormalizedMessage in) throws Exception {
         SessionCallback callback = new SessionCallback() {
             public Object doInJms(Session session) throws JMSException {
                 try {
-                    processInOutInSession(exchange, in, out, session);
+                    processInOutInSession(exchange, in, session);
                     return null;
                 } catch (JMSException e) {
                     throw e;
@@ -391,71 +514,170 @@
         template.execute(callback, true);
     }
     
-    protected void processInOutInSession(final MessageExchange exchange, 
-                                         final NormalizedMessage in, 
-                                         final NormalizedMessage out, 
+    /**
+     * Process an InOnly or RobustInOnly exchange inside a JMS session.
+     * This method delegates the JMS message creation to the marshaler and uses
+     * the JMS template to send it.  If the JMS destination that was used to send
+     * the message is not the default one, it synchronously wait for the message
+     * to come back using a JMS selector.  Else, it just returns and the response
+     * message will come back from the listener container.
+     *
+     * @param exchange
+     * @param in
+     * @param session
+     * @throws Exception
+     */
+    protected void processInOutInSession(final MessageExchange exchange,
+                                         final NormalizedMessage in,
                                          final Session session) throws Exception {
         // Create destinations
         final Destination dest = getDestination(exchange, in, session);
-        final Destination replyDest = getReplyDestination(exchange, out, session);
+        final Destination replyDest = getReplyDestination(exchange, in, session);
         // Create message and send it
         final Message sendJmsMsg = marshaler.createMessage(exchange, in, session);
-        sendJmsMsg.setJMSReplyTo(replyDest);  
+        sendJmsMsg.setJMSReplyTo(replyDest);
+        // handle correlation ID
+        String correlationId = sendJmsMsg.getJMSMessageID() != null ? sendJmsMsg.getJMSMessageID() : exchange.getExchangeId(); 
+        sendJmsMsg.setJMSCorrelationID(correlationId);
+
+        boolean asynchronous = replyDest.equals(replyDestination);
+
+        if (asynchronous) {
+            store.store(correlationId, exchange);
+        }
+
+        try {
+            template.send(dest, new MessageCreator() {
+                public Message createMessage(Session session)
+                    throws JMSException {
+                    return sendJmsMsg;
+                }
+            });
+        } catch (Exception e) {
+            if (asynchronous) {
+                store.load(exchange.getExchangeId());
+            }
+            throw e;
+        }
 
-        template.send(dest, new MessageCreator() {
-            public Message createMessage(Session session)
-                throws JMSException {
-                return sendJmsMsg;
+        if (!asynchronous) {
+            // Create selector
+            String jmsId = sendJmsMsg.getJMSMessageID();
+            String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
+            Message receiveJmsMsg;
+            synchronized (template) {
+                // Receiving JMS Message, Creating and Returning NormalizedMessage out
+                receiveJmsMsg = template.receiveSelected(replyDest, selector);
+                if (receiveJmsMsg == null) {
+                    throw new IllegalStateException("Unable to receive response");
+                }
             }
-        });
-        
-        // Create selector
-        String jmsId = sendJmsMsg.getJMSMessageID();
-        String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
-        Message receiveJmsMsg;
-        synchronized (template) {
-            // Receiving JMS Message, Creating and Returning NormalizedMessage out
-            receiveJmsMsg = template.receiveSelected(replyDest, selector);
-            if (receiveJmsMsg == null) {
-                throw new IllegalStateException("Unable to receive response");
+            NormalizedMessage out = exchange.getMessage("out");
+            if (out == null) {
+                out = exchange.createMessage();
+                exchange.setMessage(out, "out");
+            }
+            marshaler.populateMessage(receiveJmsMsg, exchange, out);
+            boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+            if (txSync) {
+                sendSync(exchange);
+            } else {
+                send(exchange);
             }
-        }    
-        marshaler.populateMessage(receiveJmsMsg, exchange, out);
+        }
     }
 
-    protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
-        Object dest = null;
-        // Let the destinationChooser a chance to choose the destination 
-        if (destinationChooser != null) {
-            dest = destinationChooser.chooseDestination(exchange, message);
+    /**
+     * Process a JMS response message.
+     * This method delegates to the marshaler for the JBI out message creation
+     * and sends it in to the NMR.
+     * 
+     * @param message
+     */
+    protected void onMessage(Message message) {
+        MessageExchange exchange = null;
+        try {
+            exchange = (InOut) store.load(message.getJMSCorrelationID());
+            if (exchange == null) {
+                throw new IllegalStateException("Could not find exchange " + message.getJMSCorrelationID());
+            }
+        } catch (Exception e) {
+            logger.error("Unable to load exchange related to incoming JMS message " + message, e);
         }
-        // Default to destinationName properties
-        if (dest == null) {
-            dest = destinationName;
+        try {
+            NormalizedMessage out = exchange.getMessage("out");
+            if (out == null) {
+                out = exchange.createMessage();
+                exchange.setMessage(out, "out");
+            }
+            marshaler.populateMessage(message, exchange, out);
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Error while populating JBI exchange " + exchange, e);
+            }
+            exchange.setError(e);
         }
-        // Resolve destination if needed
-        if (dest instanceof Destination) {
-            return (Destination) dest;
-        } else if (dest instanceof String) {
-            return destinationResolver.resolveDestinationName(session, 
-                                                              (String) dest, 
-                                                              isPubSubDomain());
+        try {
+            boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+            if (txSync) {
+                sendSync(exchange);
+            } else {
+                send(exchange);
+            }
+        } catch (Exception e) {
+            logger.error("Unable to send JBI exchange " + exchange, e);
         }
-        throw new IllegalStateException("Unable to choose destination for exchange " + exchange);
     }
 
+    /**
+     * Retrieve the destination where the JMS message should be sent to.
+     *
+     * @param exchange
+     * @param message
+     * @param session
+     * @return
+     * @throws JMSException
+     */
+    protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
+        return chooseDestination(exchange, message, session, destinationChooser, destination);
+    }
+
+    /**
+     * Choose the JMS destination for the reply message
+     *
+     * @param exchange
+     * @param message
+     * @param session
+     * @return
+     * @throws JMSException
+     */
     protected Destination getReplyDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
+        return chooseDestination(exchange, message, session, replyDestinationChooser, replyDestination);
+    }
+
+    /**
+     * Choose a JMS destination given the chooser, a default destination and name
+     * @param exchange
+     * @param message
+     * @param session
+     * @param chooser
+     * @param defaultDestination
+     * @return
+     * @throws JMSException
+     */
+    protected Destination chooseDestination(MessageExchange exchange,
+                                            Object message,
+                                            Session session,
+                                            DestinationChooser chooser,
+                                            Destination defaultDestination) throws JMSException {
         Object dest = null;
-        // Let the destinationChooser a chance to choose the destination 
-        if (destinationChooser != null) {
-            dest = destinationChooser.chooseDestination(exchange, message);
+        // Let the replyDestinationChooser a chance to choose the destination
+        if (chooser != null) {
+            dest = chooser.chooseDestination(exchange, message);
         }
-        // Default to replyDestination / replyDestinationName properties
+        // Default to defaultDestination properties
         if (dest == null) {
-            dest = replyDestination;
-        }
-        if (dest == null) {
-            dest = replyDestinationName;
+            dest = defaultDestination;
         }
         // Resolve destination if needed
         if (dest instanceof Destination) {
@@ -465,21 +687,55 @@
                                                               (String) dest, 
                                                               isPubSubDomain());
         }
-        throw new IllegalStateException("Unable to choose replyDestination for exchange " + exchange);
+        throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
     }
-    
+
+    /**
+     * Start this endpoint.
+     *
+     * @throws Exception
+     */
     public synchronized void start() throws Exception {
-        template = createTemplate();
-        if (store == null && !stateless) {
+        if (store == null) {
             if (storeFactory == null) {
                 storeFactory = new MemoryStoreFactory();
             }
             store = storeFactory.open(getService().toString() + getEndpoint());
         }
+        template = createTemplate();
+        // Obtain the default destination
+        if (destination == null && destinationName != null) {
+            destination = (Destination) template.execute(new SessionCallback() {
+                public Object doInJms(Session session) throws JMSException {
+                    return destinationResolver.resolveDestinationName(session, destinationName, isPubSubDomain());
+                }
+            });
+        }
+        // Obtain the default reply destination
+        if (replyDestination == null && replyDestinationName != null) {
+            replyDestination = (Destination) template.execute(new SessionCallback() {
+                public Object doInJms(Session session) throws JMSException {
+                    return destinationResolver.resolveDestinationName(session, replyDestinationName, isPubSubDomain());
+                }
+            });
+        }
+        // create the listener container
+        if (replyDestination != null) {
+            listenerContainer = createListenerContainer();
+            listenerContainer.start();
+        }
         super.start();
     }
-    
+
+    /**
+     * Stops this endpoint.
+     * 
+     * @throws Exception
+     */
     public synchronized void stop() throws Exception {
+        if (listenerContainer != null) {
+            listenerContainer.stop();
+        }
         if (store != null) {
             if (storeFactory != null) {
                 storeFactory.close(store);
@@ -488,9 +744,13 @@
         }
         super.stop();
     }
-    
+
+    /**
+     * Validate this endpoint.
+     * 
+     * @throws DeploymentException
+     */
     public void validate() throws DeploymentException {
-        // TODO: check service, endpoint
         super.validate();
         if (getService() == null) {
             throw new DeploymentException("service must be set");
@@ -503,6 +763,11 @@
         }
     }
 
+    /**
+     * Create the JMS template to be used to send the JMS messages.
+     *
+     * @return
+     */
     protected JmsTemplate createTemplate() {
         JmsTemplate tplt;
         if (isJms102()) {
@@ -528,6 +793,32 @@
         tplt.setPubSubNoLocal(isPubSubNoLocal());
         tplt.setTimeToLive(getTimeToLive());
         tplt.setReceiveTimeout(getReceiveTimeout());
+        tplt.afterPropertiesSet();
         return tplt;
     }
+
+    /**
+     * Create the message listener container to receive response messages.
+     * 
+     * @return
+     */
+    protected AbstractMessageListenerContainer createListenerContainer() {
+        DefaultMessageListenerContainer cont;
+        if (isJms102()) {
+            cont = new DefaultMessageListenerContainer102();
+        } else {
+            cont = new DefaultMessageListenerContainer();
+        }
+        cont.setConnectionFactory(getConnectionFactory());
+        cont.setDestination(getReplyDestination());
+        cont.setPubSubDomain(isPubSubDomain());
+        cont.setPubSubNoLocal(isPubSubNoLocal());
+        cont.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                JmsProviderEndpoint.this.onMessage(message);
+            }
+        });
+        cont.afterPropertiesSet();
+        return cont;
+    }
 }

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java?rev=689094&r1=689093&r2=689094&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java Tue Aug 26 07:33:16 2008
@@ -107,7 +107,7 @@
                     jmsTemplate.send("reply", new MessageCreator() {
                         public Message createMessage(Session session) throws JMSException {
                             TextMessage rep = session.createTextMessage(baos.toString());
-                            rep.setJMSCorrelationID(msg.getJMSMessageID());
+                            rep.setJMSCorrelationID(msg.getJMSCorrelationID() != null ? msg.getJMSCorrelationID() : msg.getJMSMessageID());
                             return rep;
                         }
                     });