You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/01/14 14:48:32 UTC

svn commit: r734397 - in /servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src: main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java

Author: gnodet
Date: Wed Jan 14 05:48:12 2009
New Revision: 734397

URL: http://svn.apache.org/viewvc?rev=734397&view=rev
Log:
SM-1760, SM-1764: Fix jms provider endpoint

Modified:
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
    servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=734397&r1=734396&r2=734397&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Wed Jan 14 05:48:12 2009
@@ -61,7 +61,7 @@
  */
 public class JmsProviderEndpoint extends ProviderEndpoint implements JmsEndpointType {
 
-    private static final String MSG_SELECTOR_START = "JMSCorrelationID='";    
+    private static final String MSG_SELECTOR_START = "JMSCorrelationID='";
     private static final String MSG_SELECTOR_END = "'";
 
     private JmsProviderMarshaler marshaler = new DefaultProviderMarshaler();
@@ -116,8 +116,8 @@
     }
 
     /**
-    * Specifies a string identifying the JMS destination used to send 
-     * messages. The destination is resolved using the 
+    * Specifies a string identifying the JMS destination used to send
+     * messages. The destination is resolved using the
      * <code>DesitinationResolver</code>.
      *
      * @param destinationName the destination name
@@ -136,9 +136,9 @@
     }
 
     /**
-    * Specifies if the provider uses JMS 1.0.2 compliant APIs. Defaults to 
+    * Specifies if the provider uses JMS 1.0.2 compliant APIs. Defaults to
     * <code>false</code>.
-    * 
+    *
      * @param jms102 provider is JMS 1.0.2 compliant?
      */
     public void setJms102(boolean jms102) {
@@ -169,7 +169,7 @@
     }
 
     /**
-    * Specifies the JMS delivery mode used for the reply. Defaults to 
+    * Specifies the JMS delivery mode used for the reply. Defaults to
     * (2)(<code>PERSISTENT</code>).
     *
      * @param deliveryMode the JMS delivery mode
@@ -186,7 +186,7 @@
     }
 
     /**
-    * Specifies a class implementing logic for choosing the destination used 
+    * Specifies a class implementing logic for choosing the destination used
     * to send messages.
     *
      * @param destinationChooser the destination chooser for sending messages
@@ -206,7 +206,7 @@
     }
 
     /**
-    * Specifies a class implementing logic for choosing the destination used 
+    * Specifies a class implementing logic for choosing the destination used
     * to recieve replies.
     *
      * @param replyDestinationChooser the destination chooser used for the reply destination
@@ -222,7 +222,7 @@
     }
 
     /**
-    * Specifies the class implementing logic for converting strings into 
+    * Specifies the class implementing logic for converting strings into
     * destinations. The default is <code>DynamicDestinationResolver</code>.
     *
      * @param destinationResolver the destination resolver implementation
@@ -239,7 +239,7 @@
     }
 
     /**
-    * Specifies if the QoS values specified for the endpoint are explicitly 
+    * Specifies if the QoS values specified for the endpoint are explicitly
     * used when a messages is sent. The default is <code>false</code>.
     *
      * @param explicitQosEnabled should the QoS values be sent?
@@ -256,8 +256,8 @@
     }
 
     /**
-    * Specifies the class implementing the message marshaler. The message 
-    * marshaller is responsible for marshalling and unmarshalling JMS messages. 
+    * Specifies the class implementing the message marshaler. The message
+    * marshaller is responsible for marshalling and unmarshalling JMS messages.
     * The default is <code>DefaultProviderMarshaler</code>.
     *
      * @param marshaler the marshaler implementation
@@ -277,14 +277,14 @@
     }
 
     /**
-    * Specifies if your endpoint requires JMS message IDs. Setting the 
-    * <code>messageIdEnabled</code> property to <code>false</code> causes the 
-    * endpoint to call its message producer's 
-    * <code>setDisableMessageID() </code> with a value of <code>true</code>. 
-    * The JMS broker is then given a hint that it does not need to generate 
-    * message IDs or add them to the messages from the endpoint. The JMS 
+    * Specifies if your endpoint requires JMS message IDs. Setting the
+    * <code>messageIdEnabled</code> property to <code>false</code> causes the
+    * endpoint to call its message producer's
+    * <code>setDisableMessageID() </code> with a value of <code>true</code>.
+    * The JMS broker is then given a hint that it does not need to generate
+    * message IDs or add them to the messages from the endpoint. The JMS
     * broker can choose to accept the hint or ignore it.
-    * 
+    *
      * @param messageIdEnabled the endpoint requires message IDs?
      */
     public void setMessageIdEnabled(boolean messageIdEnabled) {
@@ -299,14 +299,14 @@
     }
 
     /**
-    * Specifies if your endpoints requires time stamps on its messages. 
-    * Setting the <code>messageTimeStampEnabled</code> property to 
-    * <code>false</code> causes the endpoint to call its message producer's 
-    * <code>setDisableMessageTimestamp() </code> method with a value of 
-    * <code>true</code>. The JMS broker is then given a hint that it does not 
-    * need to generate message IDs or add them to the messages from the 
+    * Specifies if your endpoints requires time stamps on its messages.
+    * Setting the <code>messageTimeStampEnabled</code> property to
+    * <code>false</code> causes the endpoint to call its message producer's
+    * <code>setDisableMessageTimestamp() </code> method with a value of
+    * <code>true</code>. The JMS broker is then given a hint that it does not
+    * need to generate message IDs or add them to the messages from the
     * endpoint. The JMS broker can choose to accept the hint or ignore it.
-    * 
+    *
      * @param messageTimestampEnabled the endpoint requires time stamps?
      */
     public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
@@ -337,8 +337,8 @@
     }
 
     /**
-    * Specifies if the destination is a topic. <code>true</code> means the 
-    * destination is a topic. <code>false</code> means the destination is a 
+    * Specifies if the destination is a topic. <code>true</code> means the
+    * destination is a topic. <code>false</code> means the destination is a
     * queue.
     *
      * @param pubSubDomain the destination is a topic?
@@ -355,7 +355,7 @@
     }
 
     /**
-    * Specifies if messages published by the listener's <code>Connection</code> 
+    * Specifies if messages published by the listener's <code>Connection</code>
     * are suppressed. The default is <code>false</code>.
     *
      * @param pubSubNoLocal messages are surpressed?
@@ -579,7 +579,7 @@
         };
         template.execute(callback, true);
     }
-    
+
     /**
      * Process an InOnly or RobustInOnly exchange inside a JMS session.
      * This method delegates the JMS message creation to the marshaler and uses
@@ -596,18 +596,40 @@
     protected void processInOutInSession(final MessageExchange exchange,
                                          final NormalizedMessage in,
                                          final Session session) throws Exception {
+        // TODO: the following code may not work in pub/sub domain with temporary topics.
+        // The reason is that the the consumer is created after the request is sent.
+        // This mean that the response may be sent before the consumer is created, which would
+        // mean the consumer will not receive the response as it does not use durable subscriptions.
+
         // Create destinations
-        final Destination dest = getDestination(exchange, in, session);
-        final Destination replyDest = getReplyDestination(exchange, in, session);
+        Destination dest = getDestination(exchange, in, session);
+        // Find reply destination
+        // If we use the replyDestination / replyDestinationName, set the async flag to true
+        // to indicate we will use the listener container
+        boolean asynchronous = false;
+        boolean useSelector = true;
+        Destination replyDest = chooseDestination(exchange, in, session, replyDestinationChooser, null);
+        if (replyDest == null) {
+            useSelector = false;
+            replyDest = chooseDestination(exchange, in, session, null,
+                                          replyDestination != null ? replyDestination : replyDestinationName);
+            if (replyDest != null) {
+                asynchronous = true;
+            } else {
+                if (isPubSubDomain()) {
+                    replyDest = session.createTemporaryTopic();
+                } else {
+                    replyDest = session.createTemporaryQueue();
+                }
+            }
+        }
         // Create message and send it
         final Message sendJmsMsg = marshaler.createMessage(exchange, in, session);
         sendJmsMsg.setJMSReplyTo(replyDest);
         // handle correlation ID
-        String correlationId = sendJmsMsg.getJMSMessageID() != null ? sendJmsMsg.getJMSMessageID() : exchange.getExchangeId(); 
+        String correlationId = sendJmsMsg.getJMSMessageID() != null ? sendJmsMsg.getJMSMessageID() : exchange.getExchangeId();
         sendJmsMsg.setJMSCorrelationID(correlationId);
 
-        boolean asynchronous = replyDest.equals(replyDestination);
-
         if (asynchronous) {
             createAndStartListener();
             store.store(correlationId, exchange);
@@ -624,7 +646,7 @@
 
         if (!asynchronous) {
             // Create selector
-            String selector = MSG_SELECTOR_START + sendJmsMsg.getJMSCorrelationID() + MSG_SELECTOR_END;
+            String selector = useSelector ? (MSG_SELECTOR_START + sendJmsMsg.getJMSCorrelationID() + MSG_SELECTOR_END) : null;
             // Receiving JMS Message, Creating and Returning NormalizedMessage out
             Message receiveJmsMsg = receiveSelected(session, replyDest, selector);
             if (receiveJmsMsg == null) {
@@ -686,8 +708,8 @@
         }
     }
 
-    private Message receiveSelected(final Session session, 
-                                    final Destination dest, 
+    private Message receiveSelected(final Session session,
+                                    final Destination dest,
                                     final String messageSelector) throws JMSException {
         // Do not call directly the template to avoid the cost of creating a new connection / session
 //        return template.doReceive(session, dest, messageSelector);
@@ -708,7 +730,7 @@
      * Process a JMS response message.
      * This method delegates to the marshaler for the JBI out message creation
      * and sends it in to the NMR.
-     * 
+     *
      * @param message
      */
     protected void onMessage(Message message) {
@@ -773,7 +795,7 @@
      * @throws JMSException
      */
     protected Destination getDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
-        Destination dest = chooseDestination(exchange, message, session, destinationChooser, 
+        Destination dest = chooseDestination(exchange, message, session, destinationChooser,
                                              destination != null ? destination : destinationName);
         if (dest == null) {
             throw new IllegalStateException("Unable to choose a destination for exchange " + exchange);
@@ -782,31 +804,6 @@
     }
 
     /**
-     * Choose the JMS destination for the reply message.
-     * If no default destination is specified or can be extracted from the JBI exchange,
-     * a temporary destination will be created.
-     *
-     * @param exchange
-     * @param message
-     * @param session
-     * @return
-     * @throws JMSException
-     */
-    protected Destination getReplyDestination(MessageExchange exchange, Object message, Session session) throws JMSException {
-        Destination dest = chooseDestination(exchange, message, session, replyDestinationChooser, 
-                                             replyDestination != null ? replyDestination : replyDestinationName);
-        if (dest == null) {
-            if (isPubSubDomain()) {
-                return session.createTemporaryQueue();
-            } else {
-                return session.createTemporaryTopic();
-            }
-        } else {
-            return dest;
-        }
-    }
-
-    /**
      * Choose a JMS destination given the chooser, a default destination and name
      * @param exchange
      * @param message
@@ -834,8 +831,8 @@
         if (dest instanceof Destination) {
             return (Destination) dest;
         } else if (dest instanceof String) {
-            return destinationResolver.resolveDestinationName(session, 
-                                                              (String) dest, 
+            return destinationResolver.resolveDestinationName(session,
+                                                              (String) dest,
                                                               isPubSubDomain());
         }
         return null;
@@ -867,7 +864,7 @@
 
     /**
      * Stops this endpoint.
-     * 
+     *
      * @throws Exception
      */
     public synchronized void deactivate() throws Exception {
@@ -887,7 +884,7 @@
 
     /**
      * Validate this endpoint.
-     * 
+     *
      * @throws DeploymentException
      */
     public void validate() throws DeploymentException {
@@ -939,7 +936,7 @@
 
     /**
      * Create the message listener container to receive response messages.
-     * 
+     *
      * @return
      */
     protected AbstractMessageListenerContainer createListenerContainer() {
@@ -967,5 +964,5 @@
         cont.afterPropertiesSet();
         return cont;
     }
-    
+
 }

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java?rev=734397&r1=734396&r2=734397&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java Wed Jan 14 05:48:12 2009
@@ -31,7 +31,6 @@
 import javax.jms.TextMessage;
 import javax.xml.namespace.QName;
 
-import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
@@ -51,64 +50,63 @@
     private static final String MSG_PROPERTY_BLACKLISTED = "BadPropertyTest";
 
     protected List<String> blackList;
-
+    
     public void testSendWithoutProperties() throws Exception {
         container.activateComponent(createEndpoint(false), "servicemix-jms");
-
+        
         InOnly me = client.createInOnlyExchange();
         NormalizedMessage inMessage = me.getInMessage();
         inMessage.setProperty(MSG_PROPERTY, "Test-Value");
         inMessage.setProperty(MSG_PROPERTY_BLACKLISTED, "Unwanted value");
-        inMessage.setProperty(JbiConstants.DATESTAMP_PROPERTY_NAME, Calendar.getInstance().getTime());
         inMessage.setContent(new StringSource("<hello>world</hello>"));
         me.setService(new QName("jms"));
         client.sendSync(me);
         assertEquals(ExchangeStatus.DONE, me.getStatus());
-
+        
         Message msg = jmsTemplate.receive("destination");
-        assertNull("Found not expected property", msg.getStringProperty(MSG_PROPERTY));
-        assertNull("Found blacklisted property", msg.getStringProperty(MSG_PROPERTY_BLACKLISTED));
-        assertNull("Found " + JbiConstants.DATESTAMP_PROPERTY_NAME + " property", 
-            msg.getObjectProperty(JbiConstants.DATESTAMP_PROPERTY_NAME));
+        assertNull("Found not expected property", msg
+            .getStringProperty(MSG_PROPERTY));
+        assertNull("Found blacklisted property", msg
+                   .getStringProperty(MSG_PROPERTY_BLACKLISTED));
         assertNotNull(msg);
     }
 
     public void testSendSimple() throws Exception {
         container.activateComponent(createEndpoint(), "servicemix-jms");
-
+        
         InOnly me = client.createInOnlyExchange();
         NormalizedMessage inMessage = me.getInMessage();
         inMessage.setProperty(MSG_PROPERTY, "Test-Value");
         inMessage.setProperty(MSG_PROPERTY_BLACKLISTED, "Unwanted value");
+        inMessage.setProperty(JbiConstants.DATESTAMP_PROPERTY_NAME, Calendar.getInstance().getTime());
         inMessage.setContent(new StringSource("<hello>world</hello>"));
         me.setService(new QName("jms"));
         client.sendSync(me);
         assertEquals(ExchangeStatus.DONE, me.getStatus());
-
+        
         Message msg = jmsTemplate.receive("destination");
-        assertNotNull("Expected property not found", msg.getStringProperty(MSG_PROPERTY));
-        assertNull("Found blacklisted property", msg.getStringProperty(MSG_PROPERTY_BLACKLISTED));
+        assertNotNull("Expected property not found", msg
+            .getStringProperty(MSG_PROPERTY));
+        assertNull("Found blacklisted property", msg
+            .getStringProperty(MSG_PROPERTY_BLACKLISTED));
+        assertNull("Found " + JbiConstants.DATESTAMP_PROPERTY_NAME + " property", msg
+            .getObjectProperty(JbiConstants.DATESTAMP_PROPERTY_NAME));
         assertNotNull(msg);
     }
-
-    public void testSoapProviderInOnly() throws Exception {
+    
+    public void testProviderInOnlyWithoutReplyDest() throws Exception {
         JmsComponent component = new JmsComponent();
 
-        JmsSoapProviderEndpoint endpoint = new JmsSoapProviderEndpoint();
+        JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
         endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
         endpoint.setEndpoint("HelloPort");
-        endpoint.setConnectionFactory(connectionFactory);
         endpoint.setDestinationName("destination");
-        endpoint.setWsdl(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC.wsdl"));
+        endpoint.setConnectionFactory(connectionFactory);
         component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
         container.activateComponent(component, "servicemix-jms");
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        FileUtil
-            .copyInputStream(
-                             new ClassPathResource(
-                                                   "org/apache/servicemix/jms/HelloWorld-RPC-Input-OneWay.xml")
-                                 .getInputStream(), baos);
+        FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Input-OneWay.xml").getInputStream(), baos);
         InOnly me = client.createInOnlyExchange();
         me.getInMessage().setContent(new StringSource(baos.toString()));
 
@@ -119,12 +117,82 @@
 
         Message msg = jmsTemplate.receive("destination");
         assertNotNull(msg);
-        System.err.println(((TextMessage)msg).getText());
+        System.err.println(((TextMessage) msg).getText());
     }
 
-    public void testSoapProviderInOut() throws Exception {
+    public void testProviderInOutWithoutReplyDest() throws Exception {
+        JmsComponent component = new JmsComponent();
+
+        JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
+        endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
+        endpoint.setEndpoint("HelloPort");
+        endpoint.setDestinationName("destination");
+        endpoint.setConnectionFactory(connectionFactory);
+        component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
+        container.activateComponent(component, "servicemix-jms");
+
+        Thread th = new Thread() {
+            public void run() {
+                try {
+                    final Message msg = jmsTemplate.receive("destination");
+                    assertNotNull(msg);
+                    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Output.xml")
+                                .getInputStream(), baos);
+                    jmsTemplate.send(msg.getJMSReplyTo(), new MessageCreator() {
+                        public Message createMessage(Session session) throws JMSException {
+                            TextMessage rep = session.createTextMessage(baos.toString());
+                            rep.setJMSCorrelationID(msg.getJMSCorrelationID() != null ? msg.getJMSCorrelationID() : msg.getJMSMessageID());
+                            return rep;
+                        }
+                    });
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        th.start();
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Input-OneWay.xml").getInputStream(), baos);
+        InOut me = client.createInOutExchange();
+        me.getInMessage().setContent(new StringSource(baos.toString()));
+        me.setOperation(new QName("uri:HelloWorld", "OneWay"));
+        me.setService(new QName("uri:HelloWorld", "HelloService"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getOutMessage());
+        assertNotNull(me.getOutMessage().getContent());
+        System.err.println(new SourceTransformer().contentToString(me.getOutMessage()));
+        client.done(me);
+    }
+
+    public void testSoapProviderInOnly() throws Exception {
         JmsComponent component = new JmsComponent();
+        
+        JmsSoapProviderEndpoint endpoint = new JmsSoapProviderEndpoint();
+        endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
+        endpoint.setEndpoint("HelloPort");
+        endpoint.setConnectionFactory(connectionFactory);
+        endpoint.setDestinationName("destination");
+        endpoint.setWsdl(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC.wsdl"));
+        component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
+        container.activateComponent(component, "servicemix-jms");
+        
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Input-OneWay.xml").getInputStream(), baos);
+        InOnly me = client.createInOnlyExchange();
+        me.getInMessage().setContent(new StringSource(baos.toString()));
 
+        me.setOperation(new QName("uri:HelloWorld", "OneWay"));
+        me.setService(new QName("uri:HelloWorld", "HelloService"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+    }
+    
+    public void testSoapProviderInOut() throws Exception {
+        JmsComponent component = new JmsComponent();
+        
         JmsSoapProviderEndpoint endpoint = new JmsSoapProviderEndpoint();
         endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
         endpoint.setEndpoint("HelloPort");
@@ -134,23 +202,19 @@
         endpoint.setWsdl(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC.wsdl"));
         component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
         container.activateComponent(component, "servicemix-jms");
-
+        
         Thread th = new Thread() {
             public void run() {
                 try {
                     final Message msg = jmsTemplate.receive("destination");
                     assertNotNull(msg);
                     final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                    FileUtil
-                        .copyInputStream(
-                                         new ClassPathResource(
-                                                               "org/apache/servicemix/jms/HelloWorld-RPC-Output.xml")
-                                             .getInputStream(), baos);
+                    FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Output.xml")
+                                .getInputStream(), baos);
                     jmsTemplate.send("reply", new MessageCreator() {
                         public Message createMessage(Session session) throws JMSException {
                             TextMessage rep = session.createTextMessage(baos.toString());
-                            rep.setJMSCorrelationID(msg.getJMSCorrelationID() != null ? msg
-                                .getJMSCorrelationID() : msg.getJMSMessageID());
+                            rep.setJMSCorrelationID(msg.getJMSCorrelationID() != null ? msg.getJMSCorrelationID() : msg.getJMSMessageID());
                             return rep;
                         }
                     });
@@ -160,12 +224,9 @@
             }
         };
         th.start();
-
+        
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        FileUtil
-            .copyInputStream(
-                             new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Input-Hello.xml")
-                                 .getInputStream(), baos);
+        FileUtil.copyInputStream(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC-Input-Hello.xml").getInputStream(), baos);
         InOut me = client.createInOutExchange();
         me.getInMessage().setContent(new StringSource(baos.toString()));
         me.setOperation(new QName("uri:HelloWorld", "Hello"));
@@ -176,7 +237,6 @@
         assertNotNull(me.getOutMessage().getContent());
         System.err.println(new SourceTransformer().contentToString(me.getOutMessage()));
         client.done(me);
-
     }
 
     public void testSoapProviderInOutWithoutReplyDest() throws Exception {
@@ -185,7 +245,7 @@
         JmsSoapProviderEndpoint endpoint = new JmsSoapProviderEndpoint();
         endpoint.setService(new QName("uri:HelloWorld", "HelloService"));
         endpoint.setEndpoint("HelloPort");
-        endpoint.setConnectionFactory(new PooledConnectionFactory(connectionFactory));
+        endpoint.setConnectionFactory(connectionFactory);
         endpoint.setDestinationName("destination");
         endpoint.setWsdl(new ClassPathResource("org/apache/servicemix/jms/HelloWorld-RPC.wsdl"));
         component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
@@ -237,7 +297,7 @@
         // initialize the black list
         blackList = new LinkedList<String>();
         blackList.add(MSG_PROPERTY_BLACKLISTED);
-
+        
         JmsComponent component = new JmsComponent();
         JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
         endpoint.setService(new QName("jms"));