You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/12/20 21:42:18 UTC

svn commit: r1221484 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Author: tabish
Date: Tue Dec 20 20:42:18 2011
New Revision: 1221484

URL: http://svn.apache.org/viewvc?rev=1221484&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3625

Ensure that incoming commands get exception responses after the initial error is triggered so that client transports don't block wia intg for responses.  

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1221484&r1=1221483&r2=1221484&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Dec 20 20:42:18 2011
@@ -119,6 +119,7 @@ public class TransportConnection impleme
     private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
     private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
     private String duplexNetworkConnectorId;
+    private Throwable stopError = null;
 
     /**
      * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
@@ -139,14 +140,6 @@ public class TransportConnection impleme
         this.transport.setTransportListener(new DefaultTransportListener() {
             @Override
             public void onCommand(Object o) {
-
-                if (pendingStop) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Ignoring Command due to pending stop: " + o);
-                    }
-                    return;
-                }
-
                 serviceLock.readLock().lock();
                 try {
                     if (!(o instanceof Command)) {
@@ -258,6 +251,8 @@ public class TransportConnection impleme
                 ConnectionError ce = new ConnectionError();
                 ce.setException(e);
                 dispatchSync(ce);
+                // Record the error that caused the transport to stop
+                this.stopError = e;
                 // Wait a little bit to try to get the output buffer to flush
                 // the exption notification to the client.
                 try {
@@ -292,7 +287,11 @@ public class TransportConnection impleme
         boolean responseRequired = command.isResponseRequired();
         int commandId = command.getCommandId();
         try {
-            response = command.visit(this);
+            if (!pendingStop) {
+                response = command.visit(this);
+            } else {
+                response = new ExceptionResponse(this.stopError);
+            }
         } catch (Throwable e) {
             if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
                 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
@@ -301,7 +300,7 @@ public class TransportConnection impleme
 
             if(e instanceof java.lang.SecurityException){
                 // still need to close this down - in case the peer of this transport doesn't play nice
-                delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage());
+                delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
             }
 
             if (responseRequired) {
@@ -928,10 +927,11 @@ public class TransportConnection impleme
         }
     }
 
-    public void delayedStop(final int waitTime, final String reason) {
+    public void delayedStop(final int waitTime, final String reason, Throwable cause) {
         if (waitTime > 0) {
             synchronized (this) {
                 pendingStop = true;
+                stopError = cause;
             }
             try {
                 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {