You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/03/18 15:38:27 UTC

svn commit: r755606 - in /servicemix/smx4/nmr/trunk/jbi/cluster: engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ requestor/src/main/java/org/apache/servicemix/jbi/cluster/req...

Author: gnodet
Date: Wed Mar 18 14:38:27 2009
New Revision: 755606

URL: http://svn.apache.org/viewvc?rev=755606&view=rev
Log:
Fix reconnect logic for ActiveMQ requestor pool

Modified:
    servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
    servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/AbstractClusterEndpointTest.java
    servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ReconnectTest.java
    servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java
    servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/ActiveMQJmsRequestorPool.java

Modified: servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java?rev=755606&r1=755605&r2=755606&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java Wed Mar 18 14:38:27 2009
@@ -272,7 +272,6 @@
     protected AtomicInteger pendingExchanges = new AtomicInteger();
     protected AtomicBoolean pauseConsumption = new AtomicBoolean(false);
     protected int maxPendingExchanges = DEFAULT_MAX_PENDING_EXCHANGES;
-    protected AtomicBoolean paused = new AtomicBoolean(false);
 
     public Channel getChannel() {
         return channel;
@@ -370,7 +369,7 @@
     }
 
     public void pause() {
-        if (paused.compareAndSet(false, true)) {
+        if (pauseConsumption.compareAndSet(false, true)) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Pausing cluster endpoint: " + name);
             }
@@ -379,7 +378,7 @@
     }
 
     public void resume() {
-        if (paused.compareAndSet(true, false)) {
+        if (pauseConsumption.compareAndSet(true, false)) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Resuming cluster endpoint: " + name);
             }

Modified: servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/AbstractClusterEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/AbstractClusterEndpointTest.java?rev=755606&r1=755605&r2=755606&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/AbstractClusterEndpointTest.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/AbstractClusterEndpointTest.java Wed Mar 18 14:38:27 2009
@@ -88,7 +88,7 @@
         this.connectionFactory = createConnectionFactory();
         this.nmr1 = createNmr();
         this.nmr2 = createNmr();
-        this.listener = new ExchangeCompletedListener();
+        this.listener = new ExchangeCompletedListener(60000);
         this.nmr1.getListenerRegistry().register(this.listener, null);
         this.nmr2.getListenerRegistry().register(this.listener, null);
     }
@@ -113,8 +113,8 @@
 
     @Override
     protected void tearDown() throws Exception {
-        ((DisposableBean) executor).destroy();
         listener.assertExchangeCompleted();
+        ((DisposableBean) executor).destroy();
         if (executor instanceof DisposableBean) {
             ((DisposableBean) executor).destroy();
         }

Modified: servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ReconnectTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ReconnectTest.java?rev=755606&r1=755605&r2=755606&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ReconnectTest.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ReconnectTest.java Wed Mar 18 14:38:27 2009
@@ -91,18 +91,12 @@
 
         latch.await();
 
-//        Thread.sleep(1000);
-
+        broker.stop();
         cluster2.resume();
 
-        //Thread.sleep(100);
-
-        broker.stop();
-//        Thread.sleep(1000);
+        Thread.sleep(500);
         broker = createBroker(false);
 
-        latch.await();
-
         receiver.assertExchangesReceived(nbThreads * nbExchanges, TIMEOUT);
         //Thread.sleep(500);
         //receiver.assertExchangesReceived(nbThreads * nbExchanges, TIMEOUT);

Modified: servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java?rev=755606&r1=755605&r2=755606&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java Wed Mar 18 14:38:27 2009
@@ -296,10 +296,10 @@
          * Destroy this item and free associated JMS resources.
          */
         protected void destroy() {
-//                closeMessageConsumer(consumer);
-//                closeMessageProducer(producer);
             JmsUtils.closeSession(session);
             JmsUtils.closeConnection(connection);
+            session = null;
+            connection = null;
         }
 
         /**

Modified: servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/ActiveMQJmsRequestorPool.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/ActiveMQJmsRequestorPool.java?rev=755606&r1=755605&r2=755606&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/ActiveMQJmsRequestorPool.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/ActiveMQJmsRequestorPool.java Wed Mar 18 14:38:27 2009
@@ -40,6 +40,7 @@
 import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.activemq.pool.PooledSession;
 import org.springframework.jms.JmsException;
+import org.springframework.jms.connection.ConnectionFactoryUtils;
 import org.springframework.util.Assert;
 
 public class ActiveMQJmsRequestorPool extends AbstractPollingRequestorPool implements ExceptionListener {
@@ -71,6 +72,14 @@
     }
 
     public void onException(JMSException exception) {
+        handleListenerException(exception);
+        if (sharedConnectionEnabled()) {
+            try {
+                refreshSharedConnection();
+            } catch (JMSException e) {
+                e.printStackTrace();
+            }
+        }
         recreateConsumers(true);
     }
 
@@ -147,11 +156,15 @@
                 }
             }
             for (ActiveMQRequestor requestor : reqs) {
-                requestor.destroyConsumer();
-                try {
-                    requestor.afterClose();
-                } catch (Throwable t) {
+                if (destroyRequestors) {
                     requestor.destroy();
+                } else {
+                    requestor.destroyConsumer();
+                    try {
+                        requestor.afterClose();
+                    } catch (Throwable t) {
+                        requestor.destroy();
+                    }
                 }
             }
             startConsumers();
@@ -174,6 +187,7 @@
                         } catch (Throwable ex) {
                             handleListenerSetupFailure(ex, alreadyRecovered);
                             alreadyRecovered = true;
+                            recoverAfterListenerSetupFailure();
                         }
                     }
                     consumersStarting = false;