You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/05/12 14:23:24 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5772

Repository: activemq
Updated Branches:
  refs/heads/master b679c8d4d -> e19293de5


https://issues.apache.org/jira/browse/AMQ-5772

Improve Broker.removeConnection() method, so that we pass the cause for removing it (if available)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e19293de
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e19293de
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e19293de

Branch: refs/heads/master
Commit: e19293de5f434b0a80af3892f29ea964b52e27cb
Parents: b679c8d
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Tue May 12 14:22:15 2015 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Tue May 12 14:23:12 2015 +0200

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java      | 18 +++++++++++-------
 .../activemq/broker/region/RegionBroker.java      |  2 +-
 .../apache/activemq/broker/LinkStealingTest.java  | 13 +++++++++++++
 3 files changed, 25 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e19293de/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index b5e1c55..a86a36e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -117,7 +117,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
     protected BrokerInfo brokerInfo;
     protected final List<Command> dispatchQueue = new LinkedList<Command>();
     protected TaskRunner taskRunner;
-    protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
+    protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
     protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
     private final Transport transport;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
@@ -152,7 +152,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
     private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
     private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
     private String duplexNetworkConnectorId;
-    private Throwable stopError = null;
 
     /**
      * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
@@ -245,7 +244,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
             } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
                 TRANSPORTLOG.warn(this + " failed: " + e);
             }
-            stopAsync();
+            stopAsync(e);
         }
     }
 
@@ -295,7 +294,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
                 ce.setException(e);
                 dispatchSync(ce);
                 // Record the error that caused the transport to stop
-                this.stopError = e;
+                transportException.set(e);
                 // Wait a little bit to try to get the output buffer to flush
                 // the exception notification to the client.
                 try {
@@ -334,7 +333,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
             if (!pendingStop) {
                 response = command.visit(this);
             } else {
-                response = new ExceptionResponse(this.stopError);
+                response = new ExceptionResponse(transportException.get());
             }
         } catch (Throwable e) {
             if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
@@ -863,7 +862,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
                 iter.remove();
             }
             try {
-                broker.removeConnection(cs.getContext(), cs.getInfo(), null);
+                broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get());
             } catch (Throwable e) {
                 SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e);
             }
@@ -1073,7 +1072,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
         if (waitTime > 0) {
             synchronized (this) {
                 pendingStop = true;
-                stopError = cause;
+                transportException.set(cause);
             }
             try {
                 stopTaskRunnerFactory.execute(new Runnable() {
@@ -1093,6 +1092,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
         }
     }
 
+    public void stopAsync(Throwable cause) {
+        transportException.set(cause);
+        stopAsync();
+    }
+
     public void stopAsync() {
         // If we're in the middle of starting then go no further... for now.
         synchronized (this) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/e19293de/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index bdeeb76..e975b4c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -266,7 +266,7 @@ public class RegionBroker extends EmptyBroker {
                 LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
                 if (connection instanceof TransportConnection) {
                     TransportConnection transportConnection = (TransportConnection) connection;
-                    transportConnection.stopAsync();
+                    transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId()));
                 } else {
                     connection.stop();
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e19293de/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java b/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java
index ec944fb..60fce40 100644
--- a/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java
+++ b/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java
@@ -18,20 +18,32 @@ package org.apache.activemq.broker;
 
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ConnectionInfo;
 
 import javax.jms.Connection;
 import javax.jms.InvalidClientIDException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class LinkStealingTest extends TestCase {
     protected BrokerService brokerService;
     protected int timeOutInSeconds = 10;
+    protected final AtomicReference<Throwable> removeException = new AtomicReference<Throwable>();
 
 
     @Override
     protected void setUp() throws Exception {
         brokerService = new BrokerService();
         brokerService.setPersistent(false);
+        brokerService.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    @Override
+                    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
+                        removeException.set(error);
+                        super.removeConnection(context, info, error);
+                    }
+                }
+        });
     }
 
     @Override
@@ -86,6 +98,7 @@ public class LinkStealingTest extends TestCase {
             exceptionFlag.set(true);
         }
         assertFalse(exceptionFlag.get());
+        assertNotNull(removeException.get());
 
     }
 }