You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2013/12/16 18:13:36 UTC
git commit: Fix for AMQ-4889, potential leak of ProxyConnectors
Updated Branches:
refs/heads/trunk cb5c29d02 -> 257710ba1
Fix for AMQ-4889, potential leak of ProxyConnectors
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/257710ba
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/257710ba
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/257710ba
Branch: refs/heads/trunk
Commit: 257710ba1a40e0ac78f5f8035d9663775e122d41
Parents: cb5c29d
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Mon Dec 16 18:13:27 2013 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Mon Dec 16 18:13:27 2013 +0100
----------------------------------------------------------------------
.../apache/activemq/proxy/ProxyConnection.java | 74 +++++++++++--
.../apache/activemq/proxy/ProxyConnector.java | 48 +++++---
.../org/apache/activemq/proxy/AMQ4889Test.java | 111 +++++++++++++++++++
3 files changed, 210 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/257710ba/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java
index 789dbbf..3cc2c18 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java
@@ -16,9 +16,6 @@
*/
package org.apache.activemq.proxy;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.activemq.Service;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
@@ -29,14 +26,17 @@ import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
class ProxyConnection implements Service {
private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
- private final Transport localTransport;
- private final Transport remoteTransport;
- private AtomicBoolean shuttingDown = new AtomicBoolean(false);
- private AtomicBoolean running = new AtomicBoolean(false);
+ protected final Transport localTransport;
+ protected final Transport remoteTransport;
+ private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
+ private final AtomicBoolean running = new AtomicBoolean(false);
public ProxyConnection(Transport localTransport, Transport remoteTransport) {
this.localTransport = localTransport;
@@ -53,12 +53,14 @@ class ProxyConnection implements Service {
}
}
+ @Override
public void start() throws Exception {
if (!running.compareAndSet(false, true)) {
return;
}
this.localTransport.setTransportListener(new DefaultTransportListener() {
+ @Override
public void onCommand(Object command) {
boolean shutdown = false;
if (command.getClass() == ShutdownInfo.class) {
@@ -81,12 +83,14 @@ class ProxyConnection implements Service {
}
}
+ @Override
public void onException(IOException error) {
onFailure(error);
}
});
this.remoteTransport.setTransportListener(new DefaultTransportListener() {
+ @Override
public void onCommand(Object command) {
try {
// skipping WireFormat infos
@@ -99,6 +103,7 @@ class ProxyConnection implements Service {
}
}
+ @Override
public void onException(IOException error) {
onFailure(error);
}
@@ -108,15 +113,68 @@ class ProxyConnection implements Service {
remoteTransport.start();
}
+ @Override
public void stop() throws Exception {
if (!running.compareAndSet(true, false)) {
return;
}
shuttingDown.set(true);
ServiceStopper ss = new ServiceStopper();
- ss.stop(localTransport);
ss.stop(remoteTransport);
+ ss.stop(localTransport);
ss.throwFirstException();
}
+
+ @Override
+ public boolean equals(Object arg) {
+ if (arg == null || !(arg instanceof ProxyConnection)) {
+ return false;
+ } else {
+ ProxyConnection other = (ProxyConnection) arg;
+ String otherRemote = "";
+ String otherLocal = "";
+ String thisRemote = "";
+ String thisLocal = "";
+
+ if (other.localTransport != null && other.localTransport.getRemoteAddress() != null) {
+ otherLocal = other.localTransport.getRemoteAddress();
+ }
+ if (other.remoteTransport != null && other.remoteTransport.getRemoteAddress() != null) {
+ otherRemote = other.remoteTransport.getRemoteAddress();
+ }
+ if (this.remoteTransport != null && this.remoteTransport.getRemoteAddress() != null) {
+ thisRemote = this.remoteTransport.getRemoteAddress();
+ }
+ if (this.localTransport != null && this.localTransport.getRemoteAddress() != null) {
+ thisLocal = this.localTransport.getRemoteAddress();
+ }
+
+ if (otherRemote.equals(thisRemote) && otherLocal.equals(thisLocal)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+
+ @Override
+ public int hashCode() {
+ int hash = 17;
+ if (localTransport != null && localTransport.getRemoteAddress() != null) {
+ hash += 31 * hash + localTransport.getRemoteAddress().hashCode();
+ }
+ if (remoteTransport != null && remoteTransport.getRemoteAddress() != null) {
+ hash = 31 * hash + remoteTransport.hashCode();
+ }
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "ProxyConnection [localTransport=" + localTransport
+ + ", remoteTransport=" + remoteTransport + ", shuttingDown="
+ + shuttingDown.get() + ", running=" + running.get() + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/257710ba/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
index d82911b..b36faaf 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
@@ -16,11 +16,6 @@
*/
package org.apache.activemq.proxy;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Iterator;
-import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.Transport;
@@ -32,10 +27,14 @@ import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+
/**
* @org.apache.xbean.XBean
- *
- *
*/
public class ProxyConnector implements Service {
@@ -45,45 +44,59 @@ public class ProxyConnector implements Service {
private URI remote;
private URI localUri;
private String name;
+
/**
* Should we proxy commands to the local broker using VM transport as well?
*/
private boolean proxyToLocalBroker = true;
-
+
private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
+ @Override
public void start() throws Exception {
this.getServer().setAcceptListener(new TransportAcceptListener() {
+ @Override
public void onAccept(Transport localTransport) {
+ ProxyConnection connection = null;
try {
- Transport remoteTransport = createRemoteTransport();
- ProxyConnection connection = new ProxyConnection(localTransport, remoteTransport);
- connections.add(connection);
+ Transport remoteTransport = createRemoteTransport(localTransport);
+ connection = new ProxyConnection(localTransport, remoteTransport);
connection.start();
+ connections.add(connection);
} catch (Exception e) {
onAcceptError(e);
+ try {
+ if (connection != null) {
+ connection.stop();
+ }
+ } catch (Exception eoc) {
+ LOG.error("Could not close broken connection: ", eoc);
+ }
}
}
+ @Override
public void onAcceptError(Exception error) {
LOG.error("Could not accept connection: ", error);
}
});
getServer().start();
LOG.info("Proxy Connector {} started", getName());
-
}
+ @Override
public void stop() throws Exception {
ServiceStopper ss = new ServiceStopper();
if (this.server != null) {
ss.stop(this.server);
}
+
for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) {
LOG.info("Connector stopped: Stopping proxy.");
ss.stop(iter.next());
}
+ connections.clear();
ss.throwFirstException();
LOG.info("Proxy Connector {} stopped", getName());
}
@@ -133,11 +146,11 @@ public class ProxyConnector implements Service {
return TransportFactory.bind(bind);
}
- private Transport createRemoteTransport() throws Exception {
+ private Transport createRemoteTransport(final Transport local) throws Exception {
Transport transport = TransportFactory.compositeConnect(remote);
CompositeTransport ct = transport.narrow(CompositeTransport.class);
if (ct != null && localUri != null && proxyToLocalBroker) {
- ct.add(false,new URI[] {localUri});
+ ct.add(false, new URI[] { localUri });
}
// Add a transport filter so that we can track the transport life cycle
@@ -146,7 +159,9 @@ public class ProxyConnector implements Service {
public void stop() throws Exception {
LOG.info("Stopping proxy.");
super.stop();
- connections.remove(this);
+ ProxyConnection dummy = new ProxyConnection(local, this);
+ LOG.debug("Removing proxyConnection {}", dummy.toString());
+ connections.remove(dummy);
}
};
return transport;
@@ -175,4 +190,7 @@ public class ProxyConnector implements Service {
this.proxyToLocalBroker = proxyToLocalBroker;
}
+ protected Integer getConnectionCount() {
+ return connections.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/257710ba/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java
new file mode 100644
index 0000000..bfe73ec
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java
@@ -0,0 +1,111 @@
+package org.apache.activemq.proxy;
+
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSSecurityException;
+import javax.jms.Session;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class AMQ4889Test {
+ protected static final Logger LOG = LoggerFactory.getLogger(AMQ4889Test.class);
+
+ public static final String USER = "user";
+ public static final String GOOD_USER_PASSWORD = "password";
+ public static final String WRONG_PASSWORD = "wrongPassword";
+ public static final String PROXY_URI = "tcp://localhost:6002";
+ public static final String LOCAL_URI = "tcp://localhost:6001";
+
+ protected BrokerService brokerService;
+ private ProxyConnector proxyConnector;
+ protected TransportConnector transportConnector;
+ protected ConnectionFactory connectionFactory;
+
+ private static final Integer ITERATIONS = 100;
+
+ protected BrokerService createBroker() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+
+ ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
+ BrokerPlugin authenticationPlugin = configureAuthentication();
+ plugins.add(authenticationPlugin);
+ BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
+ brokerService.setPlugins(plugins.toArray(array));
+
+ transportConnector = brokerService.addConnector(LOCAL_URI);
+ proxyConnector = new ProxyConnector();
+ proxyConnector.setName("proxy"); // TODO rename
+ proxyConnector.setBind(new URI(PROXY_URI));
+ proxyConnector.setRemote(new URI(LOCAL_URI));
+ brokerService.addProxyConnector(proxyConnector);
+
+ brokerService.start();
+ brokerService.waitUntilStarted();
+
+ return brokerService;
+ }
+
+ protected BrokerPlugin configureAuthentication() throws Exception {
+ List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
+ users.add(new AuthenticationUser(USER, GOOD_USER_PASSWORD, "users"));
+ SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
+
+ return authenticationPlugin;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ brokerService = createBroker();
+ connectionFactory = new ActiveMQConnectionFactory(PROXY_URI);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void testForConnectionLeak() throws Exception {
+ Integer expectedConnectionCount = 0;
+ for (int i=0; i < ITERATIONS; i++) {
+ try {
+ if (i % 2 == 0) {
+ LOG.debug("Iteration {} adding bad connection", i);
+ Connection connection = connectionFactory.createConnection(USER, WRONG_PASSWORD); // TODO change to debug
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail("createSession should fail");
+ } else {
+ LOG.debug("Iteration {} adding good connection", i);
+ Connection connection = connectionFactory.createConnection(USER, GOOD_USER_PASSWORD);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ expectedConnectionCount++;
+ }
+ //
+ } catch (JMSSecurityException e) {
+ }
+ LOG.debug("Iteration {} Connections? {}", i, proxyConnector.getConnectionCount());
+ Thread.sleep(50); // Need to wait for remove to finish
+ assertEquals(expectedConnectionCount, proxyConnector.getConnectionCount());
+ }
+ }
+}