You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/03/18 15:38:27 UTC
svn commit: r755606 - in /servicemix/smx4/nmr/trunk/jbi/cluster:
engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/
engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/
requestor/src/main/java/org/apache/servicemix/jbi/cluster/req...
Author: gnodet
Date: Wed Mar 18 14:38:27 2009
New Revision: 755606
URL: http://svn.apache.org/viewvc?rev=755606&view=rev
Log:
Fix reconnect logic for ActiveMQ requestor pool
Modified:
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/AbstractClusterEndpointTest.java
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ReconnectTest.java
servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java
servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/ActiveMQJmsRequestorPool.java
Modified: servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java?rev=755606&r1=755605&r2=755606&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java Wed Mar 18 14:38:27 2009
@@ -272,7 +272,6 @@
protected AtomicInteger pendingExchanges = new AtomicInteger();
protected AtomicBoolean pauseConsumption = new AtomicBoolean(false);
protected int maxPendingExchanges = DEFAULT_MAX_PENDING_EXCHANGES;
- protected AtomicBoolean paused = new AtomicBoolean(false);
public Channel getChannel() {
return channel;
@@ -370,7 +369,7 @@
}
public void pause() {
- if (paused.compareAndSet(false, true)) {
+ if (pauseConsumption.compareAndSet(false, true)) {
if (logger.isDebugEnabled()) {
logger.debug("Pausing cluster endpoint: " + name);
}
@@ -379,7 +378,7 @@
}
public void resume() {
- if (paused.compareAndSet(true, false)) {
+ if (pauseConsumption.compareAndSet(true, false)) {
if (logger.isDebugEnabled()) {
logger.debug("Resuming cluster endpoint: " + name);
}
Modified: servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/AbstractClusterEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/AbstractClusterEndpointTest.java?rev=755606&r1=755605&r2=755606&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/AbstractClusterEndpointTest.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/AbstractClusterEndpointTest.java Wed Mar 18 14:38:27 2009
@@ -88,7 +88,7 @@
this.connectionFactory = createConnectionFactory();
this.nmr1 = createNmr();
this.nmr2 = createNmr();
- this.listener = new ExchangeCompletedListener();
+ this.listener = new ExchangeCompletedListener(60000);
this.nmr1.getListenerRegistry().register(this.listener, null);
this.nmr2.getListenerRegistry().register(this.listener, null);
}
@@ -113,8 +113,8 @@
@Override
protected void tearDown() throws Exception {
- ((DisposableBean) executor).destroy();
listener.assertExchangeCompleted();
+ ((DisposableBean) executor).destroy();
if (executor instanceof DisposableBean) {
((DisposableBean) executor).destroy();
}
Modified: servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ReconnectTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ReconnectTest.java?rev=755606&r1=755605&r2=755606&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ReconnectTest.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/test/java/org/apache/servicemix/jbi/cluster/engine/ReconnectTest.java Wed Mar 18 14:38:27 2009
@@ -91,18 +91,12 @@
latch.await();
-// Thread.sleep(1000);
-
+ broker.stop();
cluster2.resume();
- //Thread.sleep(100);
-
- broker.stop();
-// Thread.sleep(1000);
+ Thread.sleep(500);
broker = createBroker(false);
- latch.await();
-
receiver.assertExchangesReceived(nbThreads * nbExchanges, TIMEOUT);
//Thread.sleep(500);
//receiver.assertExchangesReceived(nbThreads * nbExchanges, TIMEOUT);
Modified: servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java?rev=755606&r1=755605&r2=755606&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java Wed Mar 18 14:38:27 2009
@@ -296,10 +296,10 @@
* Destroy this item and free associated JMS resources.
*/
protected void destroy() {
-// closeMessageConsumer(consumer);
-// closeMessageProducer(producer);
JmsUtils.closeSession(session);
JmsUtils.closeConnection(connection);
+ session = null;
+ connection = null;
}
/**
Modified: servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/ActiveMQJmsRequestorPool.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/ActiveMQJmsRequestorPool.java?rev=755606&r1=755605&r2=755606&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/ActiveMQJmsRequestorPool.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/ActiveMQJmsRequestorPool.java Wed Mar 18 14:38:27 2009
@@ -40,6 +40,7 @@
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.pool.PooledSession;
import org.springframework.jms.JmsException;
+import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.util.Assert;
public class ActiveMQJmsRequestorPool extends AbstractPollingRequestorPool implements ExceptionListener {
@@ -71,6 +72,14 @@
}
public void onException(JMSException exception) {
+ handleListenerException(exception);
+ if (sharedConnectionEnabled()) {
+ try {
+ refreshSharedConnection();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
recreateConsumers(true);
}
@@ -147,11 +156,15 @@
}
}
for (ActiveMQRequestor requestor : reqs) {
- requestor.destroyConsumer();
- try {
- requestor.afterClose();
- } catch (Throwable t) {
+ if (destroyRequestors) {
requestor.destroy();
+ } else {
+ requestor.destroyConsumer();
+ try {
+ requestor.afterClose();
+ } catch (Throwable t) {
+ requestor.destroy();
+ }
}
}
startConsumers();
@@ -174,6 +187,7 @@
} catch (Throwable ex) {
handleListenerSetupFailure(ex, alreadyRecovered);
alreadyRecovered = true;
+ recoverAfterListenerSetupFailure();
}
}
consumersStarting = false;