You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/13 19:45:18 UTC

svn commit: r1481984 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java

Author: tabish
Date: Mon May 13 17:45:18 2013
New Revision: 1481984

URL: http://svn.apache.org/r1481984
Log:
ix and test for: https://issues.apache.org/jira/browse/AMQ-4531

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java   (with props)
Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=1481984&r1=1481983&r2=1481984&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Mon May 13 17:45:18 2013
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 
-public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
+public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
     protected ServerSocket serverSocket;
@@ -70,41 +70,36 @@ public class TcpTransportServer extends 
     protected long maxInactivityDuration = 30000;
     protected long maxInactivityDurationInitalDelay = 10000;
     protected int minmumWireFormatVersion;
-    protected boolean useQueueForAccept=true;
+    protected boolean useQueueForAccept = true;
 
     /**
-     * trace=true -> the Transport stack where this TcpTransport
-     * object will be, will have a TransportLogger layer
-     * trace=false -> the Transport stack where this TcpTransport
-     * object will be, will NOT have a TransportLogger layer, and therefore
-     * will never be able to print logging messages.
-     * This parameter is most probably set in Connection or TransportConnector URIs.
+     * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
+     * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
+     * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
+     * TransportConnector URIs.
      */
     protected boolean trace = false;
 
     protected int soTimeout = 0;
     protected int socketBufferSize = 64 * 1024;
-    protected int connectionTimeout =  30000;
+    protected int connectionTimeout = 30000;
 
     /**
-     * Name of the LogWriter implementation to use.
-     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
-     * This parameter is most probably set in Connection or TransportConnector URIs.
+     * Name of the LogWriter implementation to use. Names are mapped to classes in the
+     * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
+     * set in Connection or TransportConnector URIs.
      */
     protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
     /**
-     * Specifies if the TransportLogger will be manageable by JMX or not.
-     * Also, as long as there is at least 1 TransportLogger which is manageable,
-     * a TransportLoggerControl MBean will me created.
+     * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
+     * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
      */
     protected boolean dynamicManagement = false;
     /**
-     * startLogging=true -> the TransportLogger object of the Transport stack
-     * will initially write messages to the log.
-     * startLogging=false -> the TransportLogger object of the Transport stack
-     * will initially NOT write messages to the log.
-     * This parameter only has an effect if trace == true.
-     * This parameter is most probably set in Connection or TransportConnector URIs.
+     * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
+     * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
+     * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
+     * TransportConnector URIs.
      */
     protected boolean startLogging = true;
     protected final ServerSocketFactory serverSocketFactory;
@@ -116,7 +111,8 @@ public class TcpTransportServer extends 
     protected int maximumConnections = Integer.MAX_VALUE;
     protected AtomicInteger currentTransportCount = new AtomicInteger();
 
-    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
+        URISyntaxException {
         super(location);
         this.transportFactory = transportFactory;
         this.serverSocketFactory = serverSocketFactory;
@@ -136,15 +132,16 @@ public class TcpTransportServer extends 
             throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
         }
         try {
-            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
-                .getFragment()));
+            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
+                bind.getQuery(), bind.getFragment()));
         } catch (URISyntaxException e) {
 
             // it could be that the host name contains invalid characters such
             // as _ on unix platforms
             // so lets try use the IP address instead
             try {
-                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
+                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
+                    bind.getQuery(), bind.getFragment()));
             } catch (URISyntaxException e2) {
                 throw IOExceptionSupport.create(e2);
             }
@@ -166,15 +163,16 @@ public class TcpTransportServer extends 
     }
 
     /**
-     * @param wireFormatFactory The wireFormatFactory to set.
+     * @param wireFormatFactory
+     *            The wireFormatFactory to set.
      */
     public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
         this.wireFormatFactory = wireFormatFactory;
     }
 
     /**
-     * Associates a broker info with the transport server so that the transport
-     * can do discovery advertisements of the broker.
+     * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
+     * broker.
      *
      * @param brokerInfo
      */
@@ -246,7 +244,8 @@ public class TcpTransportServer extends 
     }
 
     /**
-     * @param backlog the backlog to set
+     * @param backlog
+     *            the backlog to set
      */
     public void setBacklog(int backlog) {
         this.backlog = backlog;
@@ -260,7 +259,8 @@ public class TcpTransportServer extends 
     }
 
     /**
-     * @param useQueueForAccept the useQueueForAccept to set
+     * @param useQueueForAccept
+     *            the useQueueForAccept to set
      */
     public void setUseQueueForAccept(boolean useQueueForAccept) {
         this.useQueueForAccept = useQueueForAccept;
@@ -281,7 +281,7 @@ public class TcpTransportServer extends 
                     } else {
                         if (useQueueForAccept) {
                             socketQueue.put(socket);
-                        }else {
+                        } else {
                             handleSocket(socket);
                         }
                     }
@@ -300,15 +300,14 @@ public class TcpTransportServer extends 
     }
 
     /**
-     * Allow derived classes to override the Transport implementation that this
-     * transport server creates.
+     * Allow derived classes to override the Transport implementation that this transport server creates.
      *
      * @param socket
      * @param format
      * @return
      * @throws IOException
      */
-    protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
+    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
         return new TcpTransport(format, socket);
     }
 
@@ -343,7 +342,7 @@ public class TcpTransportServer extends 
 
     @Override
     protected void doStart() throws Exception {
-        if(useQueueForAccept) {
+        if (useQueueForAccept) {
             Runnable run = new Runnable() {
                 @Override
                 public void run() {
@@ -363,11 +362,9 @@ public class TcpTransportServer extends 
                     }
                 }
             };
-            socketHandlerThread = new Thread(null, run,
-                    "ActiveMQ Transport Server Thread Handler: " + toString(),
-                    getStackSize());
+            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
             socketHandlerThread.setDaemon(true);
-            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
             socketHandlerThread.start();
         }
         super.doStart();
@@ -383,24 +380,22 @@ public class TcpTransportServer extends 
 
     @Override
     public InetSocketAddress getSocketAddress() {
-        return (InetSocketAddress)serverSocket.getLocalSocketAddress();
+        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
     }
 
     protected final void handleSocket(Socket socket) {
+        boolean closeSocket = true;
         try {
             if (this.currentTransportCount.get() >= this.maximumConnections) {
-                throw new ExceededMaximumConnectionsException("Exceeded the maximum " +
-                    "number of allowed client connections. See the 'maximumConnections' " +
-                    "property on the TCP transport configuration URI in the ActiveMQ " +
-                    "configuration file (e.g., activemq.xml)");
-
+                throw new ExceededMaximumConnectionsException(
+                    "Exceeded the maximum number of allowed client connections. See the '" +
+                    "maximumConnections' property on the TCP transport configuration URI " +
+                    "in the ActiveMQ configuration file (e.g., activemq.xml)");
             } else {
                 HashMap<String, Object> options = new HashMap<String, Object>();
                 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
-                options.put("maxInactivityDurationInitalDelay",
-                    Long.valueOf(maxInactivityDurationInitalDelay));
-                options.put("minmumWireFormatVersion",
-                    Integer.valueOf(minmumWireFormatVersion));
+                options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
+                options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
                 options.put("trace", Boolean.valueOf(trace));
                 options.put("soTimeout", Integer.valueOf(soTimeout));
                 options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
@@ -412,13 +407,13 @@ public class TcpTransportServer extends 
 
                 WireFormat format = wireFormatFactory.createWireFormat();
                 Transport transport = createTransport(socket, format);
+                closeSocket = false;
 
                 if (transport instanceof ServiceSupport) {
                     ((ServiceSupport) transport).addServiceListener(this);
                 }
 
-                Transport configuredTransport =
-                    transportFactory.serverConfigure( transport, format, options);
+                Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
 
                 getAcceptListener().onAccept(configuredTransport);
                 currentTransportCount.incrementAndGet();
@@ -426,6 +421,13 @@ public class TcpTransportServer extends 
         } catch (SocketTimeoutException ste) {
             // expect this to happen
         } catch (Exception e) {
+            if (closeSocket) {
+                try {
+                    socket.close();
+                } catch (Exception ignore) {
+                }
+            }
+
             if (!isStopping()) {
                 onAcceptError(e);
             } else if (!isStopped()) {
@@ -467,7 +469,8 @@ public class TcpTransportServer extends 
     }
 
     /**
-     * @param maximumConnections the maximumConnections to set
+     * @param maximumConnections
+     *            the maximumConnections to set
      */
     public void setMaximumConnections(int maximumConnections) {
         this.maximumConnections = maximumConnections;

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java?rev=1481984&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java Mon May 13 17:45:18 2013
@@ -0,0 +1,128 @@
+package org.apache.activemq.bugs;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.CountDownLatch;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit test for simple App.
+ */
+public class AMQ4531Test extends TestCase {
+
+    private final Logger LOG = LoggerFactory.getLogger(AMQ4531Test.class);
+
+    private String connectionURI;
+    private MBeanServer mbeanServer;
+    private BrokerService broker;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString();
+        broker.setPersistent(false);
+        broker.start();
+        mbeanServer = ManagementFactory.getPlatformMBeanServer();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+        super.tearDown();
+    }
+
+    /**
+     * Create the test case
+     *
+     * @param testName
+     *            name of the test case
+     */
+    public AMQ4531Test(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(AMQ4531Test.class);
+    }
+
+    public void testFDSLeak() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
+        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+        connection.start();
+
+        int connections = 100;
+        final long original = openFileDescriptorCount();
+        LOG.info("FD count: " + original);
+        final CountDownLatch done = new CountDownLatch(connections);
+        for (int i = 0; i < connections; i++) {
+            new Thread("worker: " + i) {
+                @Override
+                public void run() {
+                    ActiveMQConnection connection = null;
+                    try {
+                        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
+                        connection = (ActiveMQConnection) factory.createConnection();
+                        connection.start();
+                    } catch (Exception e) {
+                        LOG.debug(getStack(e));
+                    } finally {
+                        try {
+                            connection.close();
+                        } catch (Exception e) {
+                            LOG.debug(getStack(e));
+                        }
+                        done.countDown();
+                        LOG.debug("Latch count down called.");
+                    }
+                }
+            }.start();
+        }
+
+        // Wait for all the clients to finish
+        LOG.info("Waiting for latch...");
+        done.await();
+        LOG.info("Latch complete.");
+        LOG.info("FD count: " + openFileDescriptorCount());
+
+        assertTrue("Too many open file descriptors: " + openFileDescriptorCount(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                long openFDs = openFileDescriptorCount();
+                LOG.info("Current FD count [{}], original FD count[{}]", openFDs, original);
+                return (openFDs - original) < 10;
+            }
+        }));
+    }
+
+    private long openFileDescriptorCount() throws Exception {
+        return ((Long) mbeanServer.getAttribute(new ObjectName("java.lang:type=OperatingSystem"), "OpenFileDescriptorCount")).longValue();
+    }
+
+    private String getStack(Throwable aThrowable) {
+        final Writer result = new StringWriter();
+        final PrintWriter printWriter = new PrintWriter(result);
+        aThrowable.printStackTrace(printWriter);
+        return result.toString();
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
------------------------------------------------------------------------------
    svn:eol-style = native