You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/02/07 10:51:20 UTC

svn commit: r619336 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: pool/ConnectionPool.java transport/failover/FailoverTransport.java

Author: rajdavies
Date: Thu Feb  7 01:51:19 2008
New Revision: 619336

URL: http://svn.apache.org/viewvc?rev=619336&view=rev
Log:
Fix for:
https://issues.apache.org/activemq/browse/AMQ-1116
https://issues.apache.org/activemq/browse/AMQ-1575
https://issues.apache.org/activemq/browse/AMQ-1577
https://issues.apache.org/activemq/browse/AMQ-1581

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?rev=619336&r1=619335&r2=619336&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java Thu Feb  7 01:51:19 2008
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.activemq.pool;
 
 import java.io.IOException;
@@ -73,7 +74,14 @@
 
             public void transportResumed() {
             }
-        });
+        });       
+        //
+        // make sure that we set the hasFailed flag, in case the transport already failed
+        // prior to the addition of our new TransportListener
+        //
+        if(connection.isTransportFailed()) {
+            hasFailed = true;
+        }
     }
 
     public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory,
@@ -218,5 +226,4 @@
             }
         }
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=619336&r1=619335&r2=619336&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Feb  7 01:51:19 2008
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.activemq.transport.failover;
 
 import java.io.IOException;
@@ -29,6 +30,7 @@
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.state.ConnectionStateTracker;
 import org.apache.activemq.state.Tracked;
 import org.apache.activemq.thread.DefaultThreadPools;
@@ -176,7 +178,12 @@
             transportListener.transportInterupted();
         }
         synchronized (reconnectMutex) {
-            LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e, e);
+            boolean reconnectOk = false;
+            if(started) {
+                LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e, e);
+                reconnectOk = true;
+            }
+            
             if (connectedTransport != null) {
                 initialized = false;
                 ServiceSupport.dispose(connectedTransport);
@@ -185,7 +192,10 @@
                 connectedTransportURI = null;
                 connected=false;
             }
-            reconnectTask.wakeup();
+            	
+            if(reconnectOk) {
+            	reconnectTask.wakeup();
+            }
         }
     }
 
@@ -307,6 +317,21 @@
 	public void setBackupPoolSize(int backupPoolSize) {
 		this.backupPoolSize = backupPoolSize;
 	}
+	
+/*
+* BEGIN Patch segment for issue AMQ-1116
+*/	
+    /**
+     * @return Returns true if the command is one sent when a connection
+     * is being closed.
+     */
+    private boolean isShutdownCommand(Command command) {
+	return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
+    }
+/*
+* END Patch segment
+*/
+	 
 
     public void oneway(Object o) throws IOException {
         Command command = (Command)o;
@@ -314,6 +339,20 @@
         try {
 
             synchronized (reconnectMutex) {
+ 
+                if (isShutdownCommand(command) && connectedTransport == null) {
+                    if(command.isShutdownInfo()) {
+                        // Skipping send of ShutdownInfo command when not connected.
+                        return;
+                    }
+                    if(command instanceof RemoveInfo) {
+                        // Simulate response to RemoveInfo command
+                        Response response = new Response();
+                        response.setCorrelationId(command.getCommandId());
+                        myTransportListener.onCommand(response);
+                        return;
+                    }
+                }                      
                 // Keep trying until the message is sent.
                 for (int i = 0; !disposed; i++) {
                     try {
@@ -609,11 +648,28 @@
             if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
                 LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
                 connectionFailure = failure;
+ 	
+                // Make sure on initial startup, that the transportListener has been initialized
+                // for this instance.
+                while(transportListener == null) {
+                    try {
+                        Thread.sleep(100);
+                    }
+                    catch(InterruptedException iEx) {}
+                }
+
+          
+                if(transportListener != null) {
+                    if (connectionFailure instanceof IOException) {
+                    	transportListener.onException((IOException)connectionFailure);
+                    } else {
+                    	transportListener.onException(IOExceptionSupport.create(connectionFailure));
+                    }
+                }        
                 reconnectMutex.notifyAll();
                 return false;
             }
         }
-
         if (!disposed) {
 
             LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");