You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2013/12/16 18:13:36 UTC

git commit: Fix for AMQ-4889, potential leak of ProxyConnectors

Updated Branches:
  refs/heads/trunk cb5c29d02 -> 257710ba1


Fix for AMQ-4889, potential leak of ProxyConnectors


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/257710ba
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/257710ba
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/257710ba

Branch: refs/heads/trunk
Commit: 257710ba1a40e0ac78f5f8035d9663775e122d41
Parents: cb5c29d
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Mon Dec 16 18:13:27 2013 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Mon Dec 16 18:13:27 2013 +0100

----------------------------------------------------------------------
 .../apache/activemq/proxy/ProxyConnection.java  |  74 +++++++++++--
 .../apache/activemq/proxy/ProxyConnector.java   |  48 +++++---
 .../org/apache/activemq/proxy/AMQ4889Test.java  | 111 +++++++++++++++++++
 3 files changed, 210 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/257710ba/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java
index 789dbbf..3cc2c18 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.proxy;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.command.WireFormatInfo;
@@ -29,14 +26,17 @@ import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 class ProxyConnection implements Service {
 
     private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
 
-    private final Transport localTransport;
-    private final Transport remoteTransport;
-    private AtomicBoolean shuttingDown = new AtomicBoolean(false);
-    private AtomicBoolean running = new AtomicBoolean(false);
+    protected final Transport localTransport;
+    protected final Transport remoteTransport;
+    private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
+    private final AtomicBoolean running = new AtomicBoolean(false);
 
     public ProxyConnection(Transport localTransport, Transport remoteTransport) {
         this.localTransport = localTransport;
@@ -53,12 +53,14 @@ class ProxyConnection implements Service {
         }
     }
 
+    @Override
     public void start() throws Exception {
         if (!running.compareAndSet(false, true)) {
             return;
         }
 
         this.localTransport.setTransportListener(new DefaultTransportListener() {
+            @Override
             public void onCommand(Object command) {
                 boolean shutdown = false;
                 if (command.getClass() == ShutdownInfo.class) {
@@ -81,12 +83,14 @@ class ProxyConnection implements Service {
                 }
             }
 
+            @Override
             public void onException(IOException error) {
                 onFailure(error);
             }
         });
 
         this.remoteTransport.setTransportListener(new DefaultTransportListener() {
+            @Override
             public void onCommand(Object command) {
                 try {
                     // skipping WireFormat infos
@@ -99,6 +103,7 @@ class ProxyConnection implements Service {
                 }
             }
 
+            @Override
             public void onException(IOException error) {
                 onFailure(error);
             }
@@ -108,15 +113,68 @@ class ProxyConnection implements Service {
         remoteTransport.start();
     }
 
+    @Override
     public void stop() throws Exception {
         if (!running.compareAndSet(true, false)) {
             return;
         }
         shuttingDown.set(true);
         ServiceStopper ss = new ServiceStopper();
-        ss.stop(localTransport);
         ss.stop(remoteTransport);
+        ss.stop(localTransport);
         ss.throwFirstException();
     }
 
+
+    @Override
+    public boolean equals(Object arg) {
+        if (arg == null || !(arg instanceof ProxyConnection)) {
+            return false;
+        } else {
+            ProxyConnection other = (ProxyConnection) arg;
+            String otherRemote = "";
+            String otherLocal = "";
+            String thisRemote = "";
+            String thisLocal = "";
+
+            if (other.localTransport != null && other.localTransport.getRemoteAddress() != null) {
+                otherLocal = other.localTransport.getRemoteAddress();
+            }
+            if (other.remoteTransport != null && other.remoteTransport.getRemoteAddress() != null) {
+                otherRemote = other.remoteTransport.getRemoteAddress();
+            }
+            if (this.remoteTransport != null && this.remoteTransport.getRemoteAddress() != null) {
+                thisRemote = this.remoteTransport.getRemoteAddress();
+            }
+            if (this.localTransport != null && this.localTransport.getRemoteAddress() != null) {
+                thisLocal = this.localTransport.getRemoteAddress();
+            }
+
+            if (otherRemote.equals(thisRemote) && otherLocal.equals(thisLocal)) {
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+
+    @Override
+    public int hashCode() {
+        int hash = 17;
+        if (localTransport != null && localTransport.getRemoteAddress() != null) {
+            hash += 31 * hash + localTransport.getRemoteAddress().hashCode();
+        }
+        if (remoteTransport != null && remoteTransport.getRemoteAddress() != null) {
+            hash = 31 * hash + remoteTransport.hashCode();
+        }
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+     	return "ProxyConnection [localTransport=" + localTransport
+                + ", remoteTransport=" + remoteTransport + ", shuttingDown="
+                + shuttingDown.get() + ", running=" + running.get() + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/257710ba/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
index d82911b..b36faaf 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
@@ -16,11 +16,6 @@
  */
 package org.apache.activemq.proxy;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Iterator;
-import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.activemq.Service;
 import org.apache.activemq.transport.CompositeTransport;
 import org.apache.activemq.transport.Transport;
@@ -32,10 +27,14 @@ import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+
 /**
  * @org.apache.xbean.XBean
- * 
- * 
  */
 public class ProxyConnector implements Service {
 
@@ -45,45 +44,59 @@ public class ProxyConnector implements Service {
     private URI remote;
     private URI localUri;
     private String name;
+
     /**
      * Should we proxy commands to the local broker using VM transport as well?
      */
     private boolean proxyToLocalBroker = true;
-    
+
     private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
 
+    @Override
     public void start() throws Exception {
 
         this.getServer().setAcceptListener(new TransportAcceptListener() {
+            @Override
             public void onAccept(Transport localTransport) {
+                ProxyConnection connection = null;
                 try {
-                    Transport remoteTransport = createRemoteTransport();
-                    ProxyConnection connection = new ProxyConnection(localTransport, remoteTransport);
-                    connections.add(connection);
+                    Transport remoteTransport = createRemoteTransport(localTransport);
+                    connection = new ProxyConnection(localTransport, remoteTransport);
                     connection.start();
+                    connections.add(connection);
                 } catch (Exception e) {
                     onAcceptError(e);
+                    try {
+                        if (connection != null) {
+                            connection.stop();
+                        }
+                    } catch (Exception eoc) {
+                        LOG.error("Could not close broken connection: ", eoc);
+                    }
                 }
             }
 
+            @Override
             public void onAcceptError(Exception error) {
                 LOG.error("Could not accept connection: ", error);
             }
         });
         getServer().start();
         LOG.info("Proxy Connector {} started", getName());
-
     }
 
+    @Override
     public void stop() throws Exception {
         ServiceStopper ss = new ServiceStopper();
         if (this.server != null) {
             ss.stop(this.server);
         }
+
         for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) {
             LOG.info("Connector stopped: Stopping proxy.");
             ss.stop(iter.next());
         }
+        connections.clear();
         ss.throwFirstException();
         LOG.info("Proxy Connector {} stopped", getName());
     }
@@ -133,11 +146,11 @@ public class ProxyConnector implements Service {
         return TransportFactory.bind(bind);
     }
 
-    private Transport createRemoteTransport() throws Exception {
+    private Transport createRemoteTransport(final Transport local) throws Exception {
         Transport transport = TransportFactory.compositeConnect(remote);
         CompositeTransport ct = transport.narrow(CompositeTransport.class);
         if (ct != null && localUri != null && proxyToLocalBroker) {
-            ct.add(false,new URI[] {localUri});
+            ct.add(false, new URI[] { localUri });
         }
 
         // Add a transport filter so that we can track the transport life cycle
@@ -146,7 +159,9 @@ public class ProxyConnector implements Service {
             public void stop() throws Exception {
                 LOG.info("Stopping proxy.");
                 super.stop();
-                connections.remove(this);
+                ProxyConnection dummy = new ProxyConnection(local, this);
+                LOG.debug("Removing proxyConnection {}", dummy.toString());
+                connections.remove(dummy);
             }
         };
         return transport;
@@ -175,4 +190,7 @@ public class ProxyConnector implements Service {
         this.proxyToLocalBroker = proxyToLocalBroker;
     }
 
+    protected Integer getConnectionCount() {
+        return connections.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/257710ba/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java
new file mode 100644
index 0000000..bfe73ec
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java
@@ -0,0 +1,111 @@
+package org.apache.activemq.proxy;
+
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSSecurityException;
+import javax.jms.Session;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class AMQ4889Test {
+    protected static final Logger LOG = LoggerFactory.getLogger(AMQ4889Test.class);
+
+    public static final String USER = "user";
+    public static final String GOOD_USER_PASSWORD = "password";
+    public static final String WRONG_PASSWORD = "wrongPassword";
+    public static final String PROXY_URI = "tcp://localhost:6002";
+    public static final String LOCAL_URI = "tcp://localhost:6001";
+
+    protected BrokerService brokerService;
+    private ProxyConnector proxyConnector;
+    protected TransportConnector transportConnector;
+    protected ConnectionFactory connectionFactory;
+
+    private static final Integer ITERATIONS = 100;
+
+    protected BrokerService createBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+
+        ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
+        BrokerPlugin authenticationPlugin = configureAuthentication();
+        plugins.add(authenticationPlugin);
+        BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
+        brokerService.setPlugins(plugins.toArray(array));
+
+        transportConnector = brokerService.addConnector(LOCAL_URI);
+        proxyConnector = new ProxyConnector();
+        proxyConnector.setName("proxy");    // TODO rename
+        proxyConnector.setBind(new URI(PROXY_URI));
+        proxyConnector.setRemote(new URI(LOCAL_URI));
+        brokerService.addProxyConnector(proxyConnector);
+
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        return brokerService;
+    }
+
+    protected BrokerPlugin configureAuthentication() throws Exception {
+        List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
+        users.add(new AuthenticationUser(USER, GOOD_USER_PASSWORD, "users"));
+        SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
+
+        return authenticationPlugin;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = createBroker();
+        connectionFactory = new ActiveMQConnectionFactory(PROXY_URI);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void testForConnectionLeak() throws Exception {
+        Integer expectedConnectionCount = 0;
+        for (int i=0; i < ITERATIONS; i++) {
+            try {
+                if (i % 2 == 0) {
+                    LOG.debug("Iteration {} adding bad connection", i);
+                    Connection connection = connectionFactory.createConnection(USER, WRONG_PASSWORD);  // TODO change to debug
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    fail("createSession should fail");
+                } else {
+                    LOG.debug("Iteration {} adding good connection", i);
+                    Connection connection = connectionFactory.createConnection(USER, GOOD_USER_PASSWORD);
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    expectedConnectionCount++;
+                }
+                //
+            } catch (JMSSecurityException e) {
+            }
+            LOG.debug("Iteration {} Connections? {}", i, proxyConnector.getConnectionCount());
+            Thread.sleep(50);   // Need to wait for remove to finish
+            assertEquals(expectedConnectionCount, proxyConnector.getConnectionCount());
+        }
+    }
+}