You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/12/20 21:42:18 UTC
svn commit: r1221484 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Author: tabish
Date: Tue Dec 20 20:42:18 2011
New Revision: 1221484
URL: http://svn.apache.org/viewvc?rev=1221484&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3625
Ensure that incoming commands get exception responses after the initial error is triggered so that client transports don't block wia intg for responses.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1221484&r1=1221483&r2=1221484&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Dec 20 20:42:18 2011
@@ -119,6 +119,7 @@ public class TransportConnection impleme
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private String duplexNetworkConnectorId;
+ private Throwable stopError = null;
/**
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
@@ -139,14 +140,6 @@ public class TransportConnection impleme
this.transport.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
-
- if (pendingStop) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Ignoring Command due to pending stop: " + o);
- }
- return;
- }
-
serviceLock.readLock().lock();
try {
if (!(o instanceof Command)) {
@@ -258,6 +251,8 @@ public class TransportConnection impleme
ConnectionError ce = new ConnectionError();
ce.setException(e);
dispatchSync(ce);
+ // Record the error that caused the transport to stop
+ this.stopError = e;
// Wait a little bit to try to get the output buffer to flush
// the exption notification to the client.
try {
@@ -292,7 +287,11 @@ public class TransportConnection impleme
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
try {
- response = command.visit(this);
+ if (!pendingStop) {
+ response = command.visit(this);
+ } else {
+ response = new ExceptionResponse(this.stopError);
+ }
} catch (Throwable e) {
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
@@ -301,7 +300,7 @@ public class TransportConnection impleme
if(e instanceof java.lang.SecurityException){
// still need to close this down - in case the peer of this transport doesn't play nice
- delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage());
+ delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
}
if (responseRequired) {
@@ -928,10 +927,11 @@ public class TransportConnection impleme
}
}
- public void delayedStop(final int waitTime, final String reason) {
+ public void delayedStop(final int waitTime, final String reason, Throwable cause) {
if (waitTime > 0) {
synchronized (this) {
pendingStop = true;
+ stopError = cause;
}
try {
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {