You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2009/01/14 07:02:01 UTC

svn commit: r734341 - in /servicemix/components/bindings/servicemix-jms/trunk/src: main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java

Author: ffang
Date: Tue Jan 13 22:02:00 2009
New Revision: 734341

URL: http://svn.apache.org/viewvc?rev=734341&view=rev
Log:
[SM-1764]JmsProviderEndpointTest hang

Modified:
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=734341&r1=734340&r2=734341&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Tue Jan 13 22:02:00 2009
@@ -16,39 +16,38 @@
  */
 package org.apache.servicemix.jms.endpoints;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
 import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.RobustInOnly;
 import javax.jbi.messaging.InOut;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.Fault;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.Session;
 import javax.jms.MessageListener;
 import javax.jms.ObjectMessage;
-import javax.jms.Session;
 
-import org.apache.servicemix.common.JbiConstants;
 import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.common.JbiConstants;
 import org.apache.servicemix.jms.JmsEndpointType;
 import org.apache.servicemix.store.Store;
 import org.apache.servicemix.store.StoreFactory;
 import org.apache.servicemix.store.memory.MemoryStoreFactory;
-import org.springframework.jms.JmsException;
 import org.springframework.jms.UncategorizedJmsException;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.JmsTemplate102;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.AbstractPollingMessageListenerContainer;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer102;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.jms.support.destination.DynamicDestinationResolver;
 
@@ -473,23 +472,22 @@
                 return;
             // Exchange is active
             } else {
+                NormalizedMessage in;
                 // Fault message
                 if (exchange.getFault() != null) {
                     done(exchange);
                 // In message
-                } else {
-                    NormalizedMessage in = exchange.getMessage("in");
-                    if (in != null) {
-                        if (exchange instanceof InOnly) {
-                            processInOnly(exchange, in);
-                            done(exchange);
-                        } else {
-                            processInOut(exchange, in);
-                        }
-                    // This is not compliant with the default MEPs
-                    } else {
-                        throw new IllegalStateException("Provider exchange is ACTIVE, but no in or fault is provided");
+                } else if ((in = exchange.getMessage("in")) != null) {
+                    if (exchange instanceof InOnly) {
+                        processInOnly(exchange, in);
+                        done(exchange);
+                    }
+                    else {
+                        processInOut(exchange, in);
                     }
+                // This is not compliant with the default MEPs
+                } else {
+                    throw new IllegalStateException("Provider exchange is ACTIVE, but no in or fault is provided");
                 }
             }
         // Unsupported role: this should never happen has we never create exchanges
@@ -511,18 +509,18 @@
     protected void processInOnly(final MessageExchange exchange,
                                  final NormalizedMessage in) throws Exception {
         SessionCallback callback = new SessionCallback() {
-            public Object doInJms(Session session) throws JMSException {
-                try {
-                    processInOnlyInSession(exchange, in, session);
-                    return null;
-                } catch (JMSException e) {
-                    throw e;
-                } catch (RuntimeException e) {
-                    throw e;
-                } catch (Exception e) {
-                    throw new UncategorizedJmsException(e);
-                }
-            }
+          public Object doInJms(Session session) throws JMSException {
+              try {
+                  processInOnlyInSession(exchange, in, session);
+                  return null;
+              } catch (JMSException e) {
+                  throw e;
+              } catch (RuntimeException e) {
+                  throw e;
+              } catch (Exception e) {
+                  throw new UncategorizedJmsException(e);
+              }
+          }
         };
         template.execute(callback, true);
     }
@@ -563,6 +561,19 @@
      */
     protected void processInOut(final MessageExchange exchange,
                                 final NormalizedMessage in) throws Exception {
+    	if (listenerContainer == null) {
+    		// Obtain the default reply destination
+    		if (replyDestination == null && replyDestinationName != null) {
+    			replyDestination = (Destination) template.execute(new SessionCallback() {
+    				public Object doInJms(Session session) throws JMSException {
+    					return destinationResolver.resolveDestinationName(session, replyDestinationName, isPubSubDomain());
+    				}
+    			});
+    		}
+    		//create the listener container
+    		listenerContainer = createListenerContainer();
+    		listenerContainer.start();
+    	}
         SessionCallback callback = new SessionCallback() {
             public Object doInJms(Session session) throws JMSException {
                 try {
@@ -609,12 +620,16 @@
         boolean asynchronous = replyDest.equals(replyDestination);
 
         if (asynchronous) {
-            createAndStartListener();
             store.store(correlationId, exchange);
         }
 
         try {
-            send(session, dest, sendJmsMsg);
+            template.send(dest, new MessageCreator() {
+                public Message createMessage(Session session)
+                    throws JMSException {
+                    return sendJmsMsg;
+                }
+            });
         } catch (Exception e) {
             if (asynchronous) {
                 store.load(exchange.getExchangeId());
@@ -624,9 +639,10 @@
 
         if (!asynchronous) {
             // Create selector
-            String selector = MSG_SELECTOR_START + sendJmsMsg.getJMSCorrelationID() + MSG_SELECTOR_END;
+            String jmsId = sendJmsMsg.getJMSMessageID();
+            String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
             // Receiving JMS Message, Creating and Returning NormalizedMessage out
-            Message receiveJmsMsg = receiveSelected(session, replyDest, selector);
+            Message receiveJmsMsg = template.receiveSelected(replyDest, selector);
             if (receiveJmsMsg == null) {
                 throw new IllegalStateException("Unable to receive response");
             }
@@ -662,48 +678,6 @@
         }
     }
 
-    private void send(final Session session, final Destination dest, final Message message) throws JmsException {
-        // Do not call directly the template to avoid the cost of creating a new connection / session
-//        template.send(dest, new MessageCreator() {
-//            public Message createMessage(Session session) throws JMSException {
-//                return message;
-//            }
-//        });
-        try {
-            Method method = JmsTemplate.class.getDeclaredMethod("doSend", Session.class, Destination.class, MessageCreator.class);
-            method.setAccessible(true);
-            method.invoke(template, session, dest, new MessageCreator() {
-                public Message createMessage(Session session) throws JMSException {
-                    return message;
-                }
-            });
-        } catch (NoSuchMethodException e) {
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            throw new RuntimeException(e);
-        } catch (InvocationTargetException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private Message receiveSelected(final Session session, 
-                                    final Destination dest, 
-                                    final String messageSelector) throws JMSException {
-        // Do not call directly the template to avoid the cost of creating a new connection / session
-//        return template.doReceive(session, dest, messageSelector);
-        try {
-            Method method = JmsTemplate.class.getDeclaredMethod("doReceive", Session.class, Destination.class, String.class);
-            method.setAccessible(true);
-            return (Message) method.invoke(template, session, dest, messageSelector);
-        } catch (NoSuchMethodException e) {
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            throw new RuntimeException(e);
-        } catch (InvocationTargetException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     /**
      * Process a JMS response message.
      * This method delegates to the marshaler for the JBI out message creation
@@ -773,18 +747,11 @@
      * @throws JMSException
      */
     protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
-        Destination dest = chooseDestination(exchange, message, session, destinationChooser, 
-                                             destination != null ? destination : destinationName);
-        if (dest == null) {
-            throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
-        }
-        return dest;
+        return chooseDestination(exchange, message, session, destinationChooser, destination);
     }
 
     /**
-     * Choose the JMS destination for the reply message.
-     * If no default destination is specified or can be extracted from the JBI exchange,
-     * a temporary destination will be created.
+     * Choose the JMS destination for the reply message
      *
      * @param exchange
      * @param message
@@ -793,17 +760,7 @@
      * @throws JMSException
      */
     protected Destination getReplyDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
-        Destination dest = chooseDestination(exchange, message, session, replyDestinationChooser, 
-                                             replyDestination != null ? replyDestination : replyDestinationName);
-        if (dest == null) {
-            if (isPubSubDomain()) {
-                return session.createTemporaryQueue();
-            } else {
-                return session.createTemporaryTopic();
-            }
-        } else {
-            return dest;
-        }
+        return chooseDestination(exchange, message, session, replyDestinationChooser, replyDestination);
     }
 
     /**
@@ -820,7 +777,7 @@
                                             Object message,
                                             Session session,
                                             DestinationChooser chooser,
-                                            Object defaultDestination) throws JMSException {
+                                            Destination defaultDestination) throws JMSException {
         Object dest = null;
         // Let the replyDestinationChooser a chance to choose the destination
         if (chooser != null) {
@@ -838,7 +795,7 @@
                                                               (String) dest, 
                                                               isPubSubDomain());
         }
-        return null;
+        throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
     }
 
     /**
@@ -855,14 +812,15 @@
             store = storeFactory.open(getService().toString() + getEndpoint());
         }
         template = createTemplate();
-    }
-
-    protected synchronized void createAndStartListener() throws Exception {
-        if (listenerContainer == null) {
-            // create the listener container
-            listenerContainer = createListenerContainer();
-            listenerContainer.start();
+        // Obtain the default destination
+        if (destination == null && destinationName != null) {
+            destination = (Destination) template.execute(new SessionCallback() {
+                public Object doInJms(Session session) throws JMSException {
+                    return destinationResolver.resolveDestinationName(session, destinationName, isPubSubDomain());
+                }
+            });
         }
+        
     }
 
     /**
@@ -874,7 +832,6 @@
         if (listenerContainer != null) {
             listenerContainer.stop();
             listenerContainer.shutdown();
-            listenerContainer = null;
         }
         if (store != null) {
             if (storeFactory != null) {
@@ -950,12 +907,12 @@
             cont = new DefaultMessageListenerContainer();
         }
         cont.setConnectionFactory(getConnectionFactory());
-        if (replyDestination != null) {
-            cont.setDestination(replyDestination);
-        }
-        if (replyDestinationName != null) {
-            cont.setDestinationName(replyDestinationName);
+        Destination replyDest = getReplyDestination();
+        if (replyDest == null) {
+        	replyDest = resolveOrCreateDestination(template, replyDestinationName, isPubSubDomain());
+        	setReplyDestination(replyDest);
         }
+        cont.setDestination(replyDest);
         cont.setPubSubDomain(isPubSubDomain());
         cont.setPubSubNoLocal(isPubSubNoLocal());
         cont.setMessageListener(new MessageListener() {
@@ -968,4 +925,31 @@
         return cont;
     }
     
+    /**
+     * If the destinationName given is null then a temporary destination is created else the destination name
+     * is resolved using the resolver from the jmsConfig
+     *
+     * @param jmsTemplate template to use for session and resolver
+     * @param replyToDestinationName null for temporary destination or a destination name
+     * @param pubSubDomain true=pubSub, false=Queues
+     * @return resolved destination
+     */
+    private Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate,
+                                                          final String replyToDestinationName,
+                                                          final boolean pubSubDomain) {
+        return (Destination)jmsTemplate.execute(new SessionCallback() {
+            public Object doInJms(Session session) throws JMSException {
+                if (replyToDestinationName == null) {
+                        if (destination instanceof Queue) {
+                        return session.createTemporaryQueue();
+                    } else {
+                        return session.createTemporaryTopic();
+                    }
+                }
+                DestinationResolver resolv = jmsTemplate.getDestinationResolver();
+                return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
+            }
+        });
+    }
+
 }

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java?rev=734341&r1=734340&r2=734341&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java Tue Jan 13 22:02:00 2009
@@ -128,7 +128,7 @@
         endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
         endpoint.setEndpoint("HelloPort");
         endpoint.setDestinationName("destination");
-        endpoint.setConnectionFactory(connectionFactory);
+        endpoint.setConnectionFactory(new PooledConnectionFactory(connectionFactory));
         component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
         container.activateComponent(component, "servicemix-jms");