You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by cc...@apache.org on 2010/07/14 09:38:21 UTC

svn commit: r963962 - /servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java

Author: ccustine
Date: Wed Jul 14 07:38:20 2010
New Revision: 963962

URL: http://svn.apache.org/viewvc?rev=963962&view=rev
Log:
SM-1964 - JMS Flow only uses one connection even with a PooledConnectionFactory

Modified:
    servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java

Modified: servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java?rev=963962&r1=963961&r2=963962&view=diff
==============================================================================
--- servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java (original)
+++ servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java Wed Jul 14 07:38:20 2010
@@ -40,6 +40,7 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.Topic;
 
+import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.executors.Executor;
 import org.apache.servicemix.jbi.event.ComponentAdapter;
@@ -470,7 +471,14 @@ public abstract class AbstractJMSFlow ex
                 }
             }
 
-            Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Connection cnx = connection;
+            // with a PooledConnectionFactory get a new connection from the pool
+            if (connectionFactory instanceof PooledConnectionFactory) {
+                connectionFactory.createConnection();
+                cnx.start();
+            }
+            
+            Session inboundSession = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
             try {
                 Queue queue = inboundSession.createQueue(destination);
                 ObjectMessage msg = inboundSession.createObjectMessage(me);
@@ -483,6 +491,11 @@ public abstract class AbstractJMSFlow ex
                 queueProducer.send(msg);
             } finally {
                 inboundSession.close();
+                if (connectionFactory instanceof PooledConnectionFactory) {
+                    // return connection to the pool
+                    cnx.stop();
+                    cnx.close();
+                }
             }
         } catch (JMSException e) {
             log.error("Failed to send exchange: " + me + " internal JMS Network", e);