You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/01/12 18:53:41 UTC

svn commit: r733850 - in /servicemix/components/bindings/servicemix-jms/trunk/src: main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java

Author: gnodet
Date: Mon Jan 12 09:53:35 2009
New Revision: 733850

URL: http://svn.apache.org/viewvc?rev=733850&view=rev
Log:
SM-1760: smx-jms provider should not create a temporary replyTo destination unless one is needed

Modified:
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=733850&r1=733849&r2=733850&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Mon Jan 12 09:53:35 2009
@@ -16,6 +16,9 @@
  */
 package org.apache.servicemix.jms.endpoints;
 
+import java.lang.reflect.Method;
+import java.lang.reflect.InvocationTargetException;
+
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
@@ -32,6 +35,7 @@
 import javax.jms.Session;
 import javax.jms.MessageListener;
 import javax.jms.ObjectMessage;
+import javax.jms.MessageProducer;
 
 import org.apache.servicemix.common.endpoints.ProviderEndpoint;
 import org.apache.servicemix.common.JbiConstants;
@@ -40,6 +44,7 @@
 import org.apache.servicemix.store.StoreFactory;
 import org.apache.servicemix.store.memory.MemoryStoreFactory;
 import org.springframework.jms.UncategorizedJmsException;
+import org.springframework.jms.JmsException;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.JmsTemplate102;
 import org.springframework.jms.core.MessageCreator;
@@ -50,6 +55,7 @@
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.jms.support.destination.DynamicDestinationResolver;
+import org.springframework.jms.support.JmsUtils;
 
 /**
  * A Spring-based JMS provider endpoint
@@ -607,16 +613,12 @@
         boolean asynchronous = replyDest.equals(replyDestination);
 
         if (asynchronous) {
+            createAndStartListener();
             store.store(correlationId, exchange);
         }
 
         try {
-            template.send(dest, new MessageCreator() {
-                public Message createMessage(Session session)
-                    throws JMSException {
-                    return sendJmsMsg;
-                }
-            });
+            send(session, dest, sendJmsMsg);
         } catch (Exception e) {
             if (asynchronous) {
                 store.load(exchange.getExchangeId());
@@ -626,10 +628,9 @@
 
         if (!asynchronous) {
             // Create selector
-            String jmsId = sendJmsMsg.getJMSMessageID();
-            String selector = MSG_SELECTOR_START + jmsId + MSG_SELECTOR_END;
+            String selector = MSG_SELECTOR_START + sendJmsMsg.getJMSCorrelationID() + MSG_SELECTOR_END;
             // Receiving JMS Message, Creating and Returning NormalizedMessage out
-            Message receiveJmsMsg = template.receiveSelected(replyDest, selector);
+            Message receiveJmsMsg = receiveSelected(session, replyDest, selector);
             if (receiveJmsMsg == null) {
                 throw new IllegalStateException("Unable to receive response");
             }
@@ -665,6 +666,47 @@
         }
     }
 
+    private void send(final Session session, final Destination destination, final Message message) throws JmsException {
+        // Do not call directly the template to avoid the cost of creating a new connection / session
+//            template.send(dest, new MessageCreator() {
+//                public Message createMessage(Session session)
+//                    throws JMSException {
+//                    return sendJmsMsg;
+//                }
+//            });
+        try {
+            Method method = JmsTemplate.class.getDeclaredMethod("doSend", Session.class, Destination.class, MessageCreator.class);
+            method.setAccessible(true);
+            method.invoke(template, session, destination, new MessageCreator() {
+                public Message createMessage(Session session) throws JMSException {
+                    return message;
+                }
+            });
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Message receiveSelected(final Session session, final Destination destination, final String messageSelector) throws JMSException {
+        // Do not call directly the template to avoid the cost of creating a new connection / session
+//        return template.doReceive(session, destination, messageSelector);
+        try {
+            Method method = JmsTemplate.class.getDeclaredMethod("doReceive", Session.class, Destination.class, String.class);
+            method.setAccessible(true);
+            return (Message) method.invoke(template, session, destination, messageSelector);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * Process a JMS response message.
      * This method delegates to the marshaler for the JBI out message creation
@@ -734,11 +776,17 @@
      * @throws JMSException
      */
     protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
-        return chooseDestination(exchange, message, session, destinationChooser, destination);
+        Destination dest = chooseDestination(exchange, message, session, destinationChooser, destination != null ? destination : destinationName);
+        if (dest == null) {
+            throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
+        }
+        return dest;
     }
 
     /**
-     * Choose the JMS destination for the reply message
+     * Choose the JMS destination for the reply message.
+     * If no default destination is specified or can be extracted from the JBI exchange,
+     * a temporary destination will be created.
      *
      * @param exchange
      * @param message
@@ -747,7 +795,16 @@
      * @throws JMSException
      */
     protected Destination getReplyDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
-        return chooseDestination(exchange, message, session, replyDestinationChooser, replyDestination);
+        Destination dest = chooseDestination(exchange, message, session, replyDestinationChooser, replyDestination != null ? replyDestination : replyDestinationName);
+        if (dest == null) {
+            if (isPubSubDomain()) {
+                return session.createTemporaryQueue();
+            } else {
+                return session.createTemporaryTopic();
+            }
+        } else {
+            return dest;
+        }
     }
 
     /**
@@ -764,7 +821,7 @@
                                             Object message,
                                             Session session,
                                             DestinationChooser chooser,
-                                            Destination defaultDestination) throws JMSException {
+                                            Object defaultDestination) throws JMSException {
         Object dest = null;
         // Let the replyDestinationChooser a chance to choose the destination
         if (chooser != null) {
@@ -782,7 +839,7 @@
                                                               (String) dest, 
                                                               isPubSubDomain());
         }
-        throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
+        return null;
     }
 
     /**
@@ -799,25 +856,14 @@
             store = storeFactory.open(getService().toString() + getEndpoint());
         }
         template = createTemplate();
-        // Obtain the default destination
-        if (destination == null && destinationName != null) {
-            destination = (Destination) template.execute(new SessionCallback() {
-                public Object doInJms(Session session) throws JMSException {
-                    return destinationResolver.resolveDestinationName(session, destinationName, isPubSubDomain());
-                }
-            });
-        }
-        // Obtain the default reply destination
-        if (replyDestination == null && replyDestinationName != null) {
-            replyDestination = (Destination) template.execute(new SessionCallback() {
-                public Object doInJms(Session session) throws JMSException {
-                    return destinationResolver.resolveDestinationName(session, replyDestinationName, isPubSubDomain());
-                }
-            });
+    }
+
+    protected synchronized void createAndStartListener() throws Exception {
+        if (listenerContainer == null) {
+            // create the listener container
+            listenerContainer = createListenerContainer();
+            listenerContainer.start();
         }
-        // create the listener container
-        listenerContainer = createListenerContainer();
-        listenerContainer.start();
     }
 
     /**
@@ -829,6 +875,7 @@
         if (listenerContainer != null) {
             listenerContainer.stop();
             listenerContainer.shutdown();
+            listenerContainer = null;
         }
         if (store != null) {
             if (storeFactory != null) {
@@ -904,12 +951,12 @@
             cont = new DefaultMessageListenerContainer();
         }
         cont.setConnectionFactory(getConnectionFactory());
-        Destination replyDest = getReplyDestination();
-        if (replyDest == null) {
-        	replyDest = resolveOrCreateDestination(template, replyDestinationName, isPubSubDomain());
-        	setReplyDestination(replyDest);
+        if (replyDestination != null) {
+            cont.setDestination(replyDestination);
+        }
+        if (replyDestinationName != null) {
+            cont.setDestinationName(replyDestinationName);
         }
-        cont.setDestination(replyDest);
         cont.setPubSubDomain(isPubSubDomain());
         cont.setPubSubNoLocal(isPubSubNoLocal());
         cont.setMessageListener(new MessageListener() {
@@ -922,31 +969,4 @@
         return cont;
     }
     
-    /**
-     * If the destinationName given is null then a temporary destination is created else the destination name
-     * is resolved using the resolver from the jmsConfig
-     *
-     * @param jmsTemplate template to use for session and resolver
-     * @param replyToDestinationName null for temporary destination or a destination name
-     * @param pubSubDomain true=pubSub, false=Queues
-     * @return resolved destination
-     */
-    private Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate,
-                                                          final String replyToDestinationName,
-                                                          final boolean pubSubDomain) {
-        return (Destination)jmsTemplate.execute(new SessionCallback() {
-            public Object doInJms(Session session) throws JMSException {
-                if (replyToDestinationName == null) {
-                        if (destination instanceof Queue) {
-                        return session.createTemporaryQueue();
-                    } else {
-                        return session.createTemporaryTopic();
-                    }
-                }
-                DestinationResolver resolv = jmsTemplate.getDestinationResolver();
-                return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
-            }
-        });
-    }
-
 }

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java?rev=733850&r1=733849&r2=733850&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java Mon Jan 12 09:53:35 2009
@@ -95,6 +95,79 @@
         assertNotNull(msg);
     }
     
+    public void testProviderInOnlyWithoutReplyDest() throws Exception {
+        JmsComponent component = new JmsComponent();
+
+        JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
+        endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
+        endpoint.setEndpoint("HelloPort");
+        endpoint.setDestinationName("destination");
+        endpoint.setConnectionFactory(connectionFactory);
+        component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
+        container.activateComponent(component, "servicemix-jms");
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Input-OneWay.xml").getInputStream(), baos);
+        InOnly me = client.createInOnlyExchange();
+        me.getInMessage().setContent(new StringSource(baos.toString()));
+
+        me.setOperation(new QName("uri:HelloWorld", "OneWay"));
+        me.setService(new QName("uri:HelloWorld", "HelloService"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+
+        Message msg = jmsTemplate.receive("destination");
+        assertNotNull(msg);
+        System.err.println(((TextMessage) msg).getText());
+    }
+
+    public void testProviderInOutWithoutReplyDest() throws Exception {
+        JmsComponent component = new JmsComponent();
+
+        JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
+        endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
+        endpoint.setEndpoint("HelloPort");
+        endpoint.setDestinationName("destination");
+        endpoint.setConnectionFactory(connectionFactory);
+        component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
+        container.activateComponent(component, "servicemix-jms");
+
+        Thread th = new Thread() {
+            public void run() {
+                try {
+                    final Message msg = jmsTemplate.receive("destination");
+                    assertNotNull(msg);
+                    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Output.xml")
+                                .getInputStream(), baos);
+                    jmsTemplate.send(msg.getJMSReplyTo(), new MessageCreator() {
+                        public Message createMessage(Session session) throws JMSException {
+                            TextMessage rep = session.createTextMessage(baos.toString());
+                            rep.setJMSCorrelationID(msg.getJMSCorrelationID() != null ? msg.getJMSCorrelationID() : msg.getJMSMessageID());
+                            return rep;
+                        }
+                    });
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        th.start();
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Input-OneWay.xml").getInputStream(), baos);
+        InOut me = client.createInOutExchange();
+        me.getInMessage().setContent(new StringSource(baos.toString()));
+        me.setOperation(new QName("uri:HelloWorld", "OneWay"));
+        me.setService(new QName("uri:HelloWorld", "HelloService"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getOutMessage());
+        assertNotNull(me.getOutMessage().getContent());
+        System.err.println(new SourceTransformer().contentToString(me.getOutMessage()));
+        client.done(me);
+    }
+
     public void testSoapProviderInOnly() throws Exception {
         JmsComponent component = new JmsComponent();
         
@@ -116,10 +189,6 @@
         me.setService(new QName("uri:HelloWorld", "HelloService"));
         client.sendSync(me);
         assertEquals(ExchangeStatus.DONE, me.getStatus());
-        
-        Message msg = jmsTemplate.receive("destination");
-        assertNotNull(msg);
-        System.err.println(((TextMessage) msg).getText());
     }
     
     public void testSoapProviderInOut() throws Exception {
@@ -169,7 +238,6 @@
         assertNotNull(me.getOutMessage().getContent());
         System.err.println(new SourceTransformer().contentToString(me.getOutMessage()));
         client.done(me);
-        
     }
 
     public void testSoapProviderInOutWithoutReplyDest() throws Exception {