You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/02/07 10:51:20 UTC
svn commit: r619336 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
pool/ConnectionPool.java transport/failover/FailoverTransport.java
Author: rajdavies
Date: Thu Feb 7 01:51:19 2008
New Revision: 619336
URL: http://svn.apache.org/viewvc?rev=619336&view=rev
Log:
Fix for:
https://issues.apache.org/activemq/browse/AMQ-1116
https://issues.apache.org/activemq/browse/AMQ-1575
https://issues.apache.org/activemq/browse/AMQ-1577
https://issues.apache.org/activemq/browse/AMQ-1581
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?rev=619336&r1=619335&r2=619336&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java Thu Feb 7 01:51:19 2008
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.activemq.pool;
import java.io.IOException;
@@ -73,7 +74,14 @@
public void transportResumed() {
}
- });
+ });
+ //
+ // make sure that we set the hasFailed flag, in case the transport already failed
+ // prior to the addition of our new TransportListener
+ //
+ if(connection.isTransportFailed()) {
+ hasFailed = true;
+ }
}
public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory,
@@ -218,5 +226,4 @@
}
}
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=619336&r1=619335&r2=619336&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Feb 7 01:51:19 2008
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.activemq.transport.failover;
import java.io.IOException;
@@ -29,6 +30,7 @@
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.state.ConnectionStateTracker;
import org.apache.activemq.state.Tracked;
import org.apache.activemq.thread.DefaultThreadPools;
@@ -176,7 +178,12 @@
transportListener.transportInterupted();
}
synchronized (reconnectMutex) {
- LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e, e);
+ boolean reconnectOk = false;
+ if(started) {
+ LOG.warn("Transport failed, attempting to automatically reconnect due to: " + e, e);
+ reconnectOk = true;
+ }
+
if (connectedTransport != null) {
initialized = false;
ServiceSupport.dispose(connectedTransport);
@@ -185,7 +192,10 @@
connectedTransportURI = null;
connected=false;
}
- reconnectTask.wakeup();
+
+ if(reconnectOk) {
+ reconnectTask.wakeup();
+ }
}
}
@@ -307,6 +317,21 @@
public void setBackupPoolSize(int backupPoolSize) {
this.backupPoolSize = backupPoolSize;
}
+
+/*
+* BEGIN Patch segment for issue AMQ-1116
+*/
+ /**
+ * @return Returns true if the command is one sent when a connection
+ * is being closed.
+ */
+ private boolean isShutdownCommand(Command command) {
+ return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
+ }
+/*
+* END Patch segment
+*/
+
public void oneway(Object o) throws IOException {
Command command = (Command)o;
@@ -314,6 +339,20 @@
try {
synchronized (reconnectMutex) {
+
+ if (isShutdownCommand(command) && connectedTransport == null) {
+ if(command.isShutdownInfo()) {
+ // Skipping send of ShutdownInfo command when not connected.
+ return;
+ }
+ if(command instanceof RemoveInfo) {
+ // Simulate response to RemoveInfo command
+ Response response = new Response();
+ response.setCorrelationId(command.getCommandId());
+ myTransportListener.onCommand(response);
+ return;
+ }
+ }
// Keep trying until the message is sent.
for (int i = 0; !disposed; i++) {
try {
@@ -609,11 +648,28 @@
if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
+
+ // Make sure on initial startup, that the transportListener has been initialized
+ // for this instance.
+ while(transportListener == null) {
+ try {
+ Thread.sleep(100);
+ }
+ catch(InterruptedException iEx) {}
+ }
+
+
+ if(transportListener != null) {
+ if (connectionFailure instanceof IOException) {
+ transportListener.onException((IOException)connectionFailure);
+ } else {
+ transportListener.onException(IOExceptionSupport.create(connectionFailure));
+ }
+ }
reconnectMutex.notifyAll();
return false;
}
}
-
if (!disposed) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");