You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/05/12 14:23:24 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5772
Repository: activemq
Updated Branches:
refs/heads/master b679c8d4d -> e19293de5
https://issues.apache.org/jira/browse/AMQ-5772
Improve Broker.removeConnection() method, so that we pass the cause for removing it (if available)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e19293de
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e19293de
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e19293de
Branch: refs/heads/master
Commit: e19293de5f434b0a80af3892f29ea964b52e27cb
Parents: b679c8d
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Tue May 12 14:22:15 2015 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Tue May 12 14:23:12 2015 +0200
----------------------------------------------------------------------
.../activemq/broker/TransportConnection.java | 18 +++++++++++-------
.../activemq/broker/region/RegionBroker.java | 2 +-
.../apache/activemq/broker/LinkStealingTest.java | 13 +++++++++++++
3 files changed, 25 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e19293de/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index b5e1c55..a86a36e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -117,7 +117,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
protected BrokerInfo brokerInfo;
protected final List<Command> dispatchQueue = new LinkedList<Command>();
protected TaskRunner taskRunner;
- protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
+ protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
@@ -152,7 +152,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private String duplexNetworkConnectorId;
- private Throwable stopError = null;
/**
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
@@ -245,7 +244,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
TRANSPORTLOG.warn(this + " failed: " + e);
}
- stopAsync();
+ stopAsync(e);
}
}
@@ -295,7 +294,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
ce.setException(e);
dispatchSync(ce);
// Record the error that caused the transport to stop
- this.stopError = e;
+ transportException.set(e);
// Wait a little bit to try to get the output buffer to flush
// the exception notification to the client.
try {
@@ -334,7 +333,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
if (!pendingStop) {
response = command.visit(this);
} else {
- response = new ExceptionResponse(this.stopError);
+ response = new ExceptionResponse(transportException.get());
}
} catch (Throwable e) {
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
@@ -863,7 +862,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
iter.remove();
}
try {
- broker.removeConnection(cs.getContext(), cs.getInfo(), null);
+ broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get());
} catch (Throwable e) {
SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e);
}
@@ -1073,7 +1072,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
if (waitTime > 0) {
synchronized (this) {
pendingStop = true;
- stopError = cause;
+ transportException.set(cause);
}
try {
stopTaskRunnerFactory.execute(new Runnable() {
@@ -1093,6 +1092,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
+ public void stopAsync(Throwable cause) {
+ transportException.set(cause);
+ stopAsync();
+ }
+
public void stopAsync() {
// If we're in the middle of starting then go no further... for now.
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/e19293de/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index bdeeb76..e975b4c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -266,7 +266,7 @@ public class RegionBroker extends EmptyBroker {
LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
if (connection instanceof TransportConnection) {
TransportConnection transportConnection = (TransportConnection) connection;
- transportConnection.stopAsync();
+ transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId()));
} else {
connection.stop();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e19293de/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java b/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java
index ec944fb..60fce40 100644
--- a/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java
+++ b/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java
@@ -18,20 +18,32 @@ package org.apache.activemq.broker;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ConnectionInfo;
import javax.jms.Connection;
import javax.jms.InvalidClientIDException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
public class LinkStealingTest extends TestCase {
protected BrokerService brokerService;
protected int timeOutInSeconds = 10;
+ protected final AtomicReference<Throwable> removeException = new AtomicReference<Throwable>();
@Override
protected void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
+ brokerService.setPlugins(new BrokerPlugin[]{
+ new BrokerPluginSupport() {
+ @Override
+ public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
+ removeException.set(error);
+ super.removeConnection(context, info, error);
+ }
+ }
+ });
}
@Override
@@ -86,6 +98,7 @@ public class LinkStealingTest extends TestCase {
exceptionFlag.set(true);
}
assertFalse(exceptionFlag.get());
+ assertNotNull(removeException.get());
}
}