You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/13 19:45:18 UTC
svn commit: r1481984 - in /activemq/trunk:
activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
Author: tabish
Date: Mon May 13 17:45:18 2013
New Revision: 1481984
URL: http://svn.apache.org/r1481984
Log:
ix and test for: https://issues.apache.org/jira/browse/AMQ-4531
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java (with props)
Modified:
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=1481984&r1=1481983&r2=1481984&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Mon May 13 17:45:18 2013
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
*
*/
-public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
+public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
protected ServerSocket serverSocket;
@@ -70,41 +70,36 @@ public class TcpTransportServer extends
protected long maxInactivityDuration = 30000;
protected long maxInactivityDurationInitalDelay = 10000;
protected int minmumWireFormatVersion;
- protected boolean useQueueForAccept=true;
+ protected boolean useQueueForAccept = true;
/**
- * trace=true -> the Transport stack where this TcpTransport
- * object will be, will have a TransportLogger layer
- * trace=false -> the Transport stack where this TcpTransport
- * object will be, will NOT have a TransportLogger layer, and therefore
- * will never be able to print logging messages.
- * This parameter is most probably set in Connection or TransportConnector URIs.
+ * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
+ * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
+ * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
+ * TransportConnector URIs.
*/
protected boolean trace = false;
protected int soTimeout = 0;
protected int socketBufferSize = 64 * 1024;
- protected int connectionTimeout = 30000;
+ protected int connectionTimeout = 30000;
/**
- * Name of the LogWriter implementation to use.
- * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
- * This parameter is most probably set in Connection or TransportConnector URIs.
+ * Name of the LogWriter implementation to use. Names are mapped to classes in the
+ * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
+ * set in Connection or TransportConnector URIs.
*/
protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
/**
- * Specifies if the TransportLogger will be manageable by JMX or not.
- * Also, as long as there is at least 1 TransportLogger which is manageable,
- * a TransportLoggerControl MBean will me created.
+ * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
+ * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
*/
protected boolean dynamicManagement = false;
/**
- * startLogging=true -> the TransportLogger object of the Transport stack
- * will initially write messages to the log.
- * startLogging=false -> the TransportLogger object of the Transport stack
- * will initially NOT write messages to the log.
- * This parameter only has an effect if trace == true.
- * This parameter is most probably set in Connection or TransportConnector URIs.
+ * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
+ * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
+ * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
+ * TransportConnector URIs.
*/
protected boolean startLogging = true;
protected final ServerSocketFactory serverSocketFactory;
@@ -116,7 +111,8 @@ public class TcpTransportServer extends
protected int maximumConnections = Integer.MAX_VALUE;
protected AtomicInteger currentTransportCount = new AtomicInteger();
- public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
+ URISyntaxException {
super(location);
this.transportFactory = transportFactory;
this.serverSocketFactory = serverSocketFactory;
@@ -136,15 +132,16 @@ public class TcpTransportServer extends
throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
}
try {
- setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
- .getFragment()));
+ setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
+ bind.getQuery(), bind.getFragment()));
} catch (URISyntaxException e) {
// it could be that the host name contains invalid characters such
// as _ on unix platforms
// so lets try use the IP address instead
try {
- setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
+ setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
+ bind.getQuery(), bind.getFragment()));
} catch (URISyntaxException e2) {
throw IOExceptionSupport.create(e2);
}
@@ -166,15 +163,16 @@ public class TcpTransportServer extends
}
/**
- * @param wireFormatFactory The wireFormatFactory to set.
+ * @param wireFormatFactory
+ * The wireFormatFactory to set.
*/
public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
this.wireFormatFactory = wireFormatFactory;
}
/**
- * Associates a broker info with the transport server so that the transport
- * can do discovery advertisements of the broker.
+ * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
+ * broker.
*
* @param brokerInfo
*/
@@ -246,7 +244,8 @@ public class TcpTransportServer extends
}
/**
- * @param backlog the backlog to set
+ * @param backlog
+ * the backlog to set
*/
public void setBacklog(int backlog) {
this.backlog = backlog;
@@ -260,7 +259,8 @@ public class TcpTransportServer extends
}
/**
- * @param useQueueForAccept the useQueueForAccept to set
+ * @param useQueueForAccept
+ * the useQueueForAccept to set
*/
public void setUseQueueForAccept(boolean useQueueForAccept) {
this.useQueueForAccept = useQueueForAccept;
@@ -281,7 +281,7 @@ public class TcpTransportServer extends
} else {
if (useQueueForAccept) {
socketQueue.put(socket);
- }else {
+ } else {
handleSocket(socket);
}
}
@@ -300,15 +300,14 @@ public class TcpTransportServer extends
}
/**
- * Allow derived classes to override the Transport implementation that this
- * transport server creates.
+ * Allow derived classes to override the Transport implementation that this transport server creates.
*
* @param socket
* @param format
* @return
* @throws IOException
*/
- protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+ protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new TcpTransport(format, socket);
}
@@ -343,7 +342,7 @@ public class TcpTransportServer extends
@Override
protected void doStart() throws Exception {
- if(useQueueForAccept) {
+ if (useQueueForAccept) {
Runnable run = new Runnable() {
@Override
public void run() {
@@ -363,11 +362,9 @@ public class TcpTransportServer extends
}
}
};
- socketHandlerThread = new Thread(null, run,
- "ActiveMQ Transport Server Thread Handler: " + toString(),
- getStackSize());
+ socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
socketHandlerThread.setDaemon(true);
- socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+ socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
socketHandlerThread.start();
}
super.doStart();
@@ -383,24 +380,22 @@ public class TcpTransportServer extends
@Override
public InetSocketAddress getSocketAddress() {
- return (InetSocketAddress)serverSocket.getLocalSocketAddress();
+ return (InetSocketAddress) serverSocket.getLocalSocketAddress();
}
protected final void handleSocket(Socket socket) {
+ boolean closeSocket = true;
try {
if (this.currentTransportCount.get() >= this.maximumConnections) {
- throw new ExceededMaximumConnectionsException("Exceeded the maximum " +
- "number of allowed client connections. See the 'maximumConnections' " +
- "property on the TCP transport configuration URI in the ActiveMQ " +
- "configuration file (e.g., activemq.xml)");
-
+ throw new ExceededMaximumConnectionsException(
+ "Exceeded the maximum number of allowed client connections. See the '" +
+ "maximumConnections' property on the TCP transport configuration URI " +
+ "in the ActiveMQ configuration file (e.g., activemq.xml)");
} else {
HashMap<String, Object> options = new HashMap<String, Object>();
options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
- options.put("maxInactivityDurationInitalDelay",
- Long.valueOf(maxInactivityDurationInitalDelay));
- options.put("minmumWireFormatVersion",
- Integer.valueOf(minmumWireFormatVersion));
+ options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
+ options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
options.put("trace", Boolean.valueOf(trace));
options.put("soTimeout", Integer.valueOf(soTimeout));
options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
@@ -412,13 +407,13 @@ public class TcpTransportServer extends
WireFormat format = wireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format);
+ closeSocket = false;
if (transport instanceof ServiceSupport) {
((ServiceSupport) transport).addServiceListener(this);
}
- Transport configuredTransport =
- transportFactory.serverConfigure( transport, format, options);
+ Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
getAcceptListener().onAccept(configuredTransport);
currentTransportCount.incrementAndGet();
@@ -426,6 +421,13 @@ public class TcpTransportServer extends
} catch (SocketTimeoutException ste) {
// expect this to happen
} catch (Exception e) {
+ if (closeSocket) {
+ try {
+ socket.close();
+ } catch (Exception ignore) {
+ }
+ }
+
if (!isStopping()) {
onAcceptError(e);
} else if (!isStopped()) {
@@ -467,7 +469,8 @@ public class TcpTransportServer extends
}
/**
- * @param maximumConnections the maximumConnections to set
+ * @param maximumConnections
+ * the maximumConnections to set
*/
public void setMaximumConnections(int maximumConnections) {
this.maximumConnections = maximumConnections;
Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java?rev=1481984&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java Mon May 13 17:45:18 2013
@@ -0,0 +1,128 @@
+package org.apache.activemq.bugs;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.CountDownLatch;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit test for simple App.
+ */
+public class AMQ4531Test extends TestCase {
+
+ private final Logger LOG = LoggerFactory.getLogger(AMQ4531Test.class);
+
+ private String connectionURI;
+ private MBeanServer mbeanServer;
+ private BrokerService broker;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ broker = new BrokerService();
+ connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString();
+ broker.setPersistent(false);
+ broker.start();
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ broker.stop();
+ super.tearDown();
+ }
+
+ /**
+ * Create the test case
+ *
+ * @param testName
+ * name of the test case
+ */
+ public AMQ4531Test(String testName) {
+ super(testName);
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite() {
+ return new TestSuite(AMQ4531Test.class);
+ }
+
+ public void testFDSLeak() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
+ ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+ connection.start();
+
+ int connections = 100;
+ final long original = openFileDescriptorCount();
+ LOG.info("FD count: " + original);
+ final CountDownLatch done = new CountDownLatch(connections);
+ for (int i = 0; i < connections; i++) {
+ new Thread("worker: " + i) {
+ @Override
+ public void run() {
+ ActiveMQConnection connection = null;
+ try {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
+ connection = (ActiveMQConnection) factory.createConnection();
+ connection.start();
+ } catch (Exception e) {
+ LOG.debug(getStack(e));
+ } finally {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ LOG.debug(getStack(e));
+ }
+ done.countDown();
+ LOG.debug("Latch count down called.");
+ }
+ }
+ }.start();
+ }
+
+ // Wait for all the clients to finish
+ LOG.info("Waiting for latch...");
+ done.await();
+ LOG.info("Latch complete.");
+ LOG.info("FD count: " + openFileDescriptorCount());
+
+ assertTrue("Too many open file descriptors: " + openFileDescriptorCount(), Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ long openFDs = openFileDescriptorCount();
+ LOG.info("Current FD count [{}], original FD count[{}]", openFDs, original);
+ return (openFDs - original) < 10;
+ }
+ }));
+ }
+
+ private long openFileDescriptorCount() throws Exception {
+ return ((Long) mbeanServer.getAttribute(new ObjectName("java.lang:type=OperatingSystem"), "OpenFileDescriptorCount")).longValue();
+ }
+
+ private String getStack(Throwable aThrowable) {
+ final Writer result = new StringWriter();
+ final PrintWriter printWriter = new PrintWriter(result);
+ aThrowable.printStackTrace(printWriter);
+ return result.toString();
+ }
+}
Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
------------------------------------------------------------------------------
svn:eol-style = native