You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/03/17 17:18:15 UTC

svn commit: r924350 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java

Author: rajdavies
Date: Wed Mar 17 16:18:14 2010
New Revision: 924350

URL: http://svn.apache.org/viewvc?rev=924350&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2645

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=924350&r1=924349&r2=924350&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Wed Mar 17 16:18:14 2010
@@ -186,7 +186,7 @@ public class ActiveMQConnection implemen
     private DestinationSource destinationSource;
     private final Object ensureConnectionInfoSentMutex = new Object();
     private boolean useDedicatedTaskRunner;
-    protected CountDownLatch transportInterruptionProcessingComplete;
+    protected volatile CountDownLatch transportInterruptionProcessingComplete;
     private long consumerFailoverRedeliveryWaitPeriod;
 
     /**
@@ -1840,7 +1840,7 @@ public class ActiveMQConnection implemen
 	}
 
     public void transportInterupted() {
-        transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
+        this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
         if (LOG.isDebugEnabled()) {
             LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
         }
@@ -2245,17 +2245,20 @@ public class ActiveMQConnection implemen
 	}
 	
 	protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
-        if (transportInterruptionProcessingComplete != null) {
-            while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(10, TimeUnit.SECONDS)) {
-                LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete..");
+	    CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+	    if (cdl != null) {
+            if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
+                LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
+                cdl.await(10, TimeUnit.SECONDS);
             }
             signalInterruptionProcessingComplete();
         }
     }
 	
-	protected synchronized void transportInterruptionProcessingComplete() {
-	    if (transportInterruptionProcessingComplete != null) {
-	        transportInterruptionProcessingComplete.countDown();
+	protected void transportInterruptionProcessingComplete() {
+	    CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+	    if (cdl != null) {
+	        cdl.countDown();
 	        try {
 	            signalInterruptionProcessingComplete();
 	        } catch (InterruptedException ignored) {}
@@ -2263,20 +2266,22 @@ public class ActiveMQConnection implemen
 	}
 
     private void signalInterruptionProcessingComplete() throws InterruptedException {
-        if (transportInterruptionProcessingComplete.await(0, TimeUnit.SECONDS)) {
+        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+        if (cdl.getCount()==0) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
             }
-            synchronized (this) {
-                transportInterruptionProcessingComplete = null;
-                FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
-                if (failoverTransport != null) {
-                    failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("notified failover transport (" + failoverTransport +") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
-                    }
-                } 
+            this.transportInterruptionProcessingComplete = null;
+
+            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
+            if (failoverTransport != null) {
+                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("notified failover transport (" + failoverTransport
+                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
+                }
             }
+
         }
     }