You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/05/13 15:13:34 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6209
Repository: activemq
Updated Branches:
refs/heads/master d7b5a62bb -> ff9987226
https://issues.apache.org/jira/browse/AMQ-6209
Better management of shared resources between the background run thread
and the main start / stop thread. Makes sure to cleanup all resources
before finally throwing on stop to prevent leaking and resources.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ff998722
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ff998722
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ff998722
Branch: refs/heads/master
Commit: ff99872263981982bb1ebce93c07bfb8a28d4a06
Parents: d7b5a62
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 13 11:13:11 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 13 11:13:28 2016 -0400
----------------------------------------------------------------------
.../transport/tcp/TcpTransportServer.java | 214 ++++++++++++-------
1 file changed, 132 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ff998722/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index f071522..33d9c72 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -26,6 +26,7 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
@@ -50,8 +51,6 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport;
-import org.apache.activemq.transport.nio.SelectorManager;
-import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.IntrospectionSupport;
@@ -69,8 +68,9 @@ import org.slf4j.LoggerFactory;
public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
- protected ServerSocket serverSocket;
- protected Selector selector;
+
+ protected volatile ServerSocket serverSocket;
+ protected volatile Selector selector;
protected int backlog = 5000;
protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
protected final TcpTransportFactory transportFactory;
@@ -113,14 +113,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
*/
protected boolean startLogging = true;
protected final ServerSocketFactory serverSocketFactory;
- protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
+ protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
protected Thread socketHandlerThread;
/**
* The maximum number of sockets allowed for this server
*/
protected int maximumConnections = Integer.MAX_VALUE;
- protected AtomicInteger currentTransportCount = new AtomicInteger();
+ protected final AtomicInteger currentTransportCount = new AtomicInteger();
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
URISyntaxException {
@@ -137,8 +137,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
InetAddress addr = InetAddress.getByName(host);
try {
- this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
- configureServerSocket(this.serverSocket);
+ serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
+ configureServerSocket(serverSocket);
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
}
@@ -146,7 +146,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
bind.getQuery(), bind.getFragment()));
} catch (URISyntaxException e) {
-
// it could be that the host name contains invalid characters such
// as _ on unix platforms so lets try use the IP address instead
try {
@@ -302,87 +301,114 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
*/
@Override
public void run() {
- final ServerSocketChannel chan = serverSocket.getChannel();
- if (chan != null) {
+ if (!isStopped() && !isStopping()) {
+ final ServerSocket serverSocket = this.serverSocket;
+ if (serverSocket == null) {
+ onAcceptError(new IOException("Server started without a valid ServerSocket"));
+ }
+
+ final ServerSocketChannel channel = serverSocket.getChannel();
+ if (channel != null) {
+ doRunWithServerSocketChannel(channel);
+ } else {
+ doRunWithServerSocket(serverSocket);
+ }
+ }
+ }
+
+ private void doRunWithServerSocketChannel(final ServerSocketChannel channel) {
+ try {
+ channel.configureBlocking(false);
+ final Selector selector = Selector.open();
+
try {
- chan.configureBlocking(false);
- selector = Selector.open();
- chan.register(selector, SelectionKey.OP_ACCEPT);
- while (!isStopped()) {
- int count = selector.select(10);
-
- if (count == 0) {
- continue;
- }
+ channel.register(selector, SelectionKey.OP_ACCEPT);
+ } catch (ClosedChannelException ex) {
+ try {
+ selector.close();
+ } catch (IOException ignore) {}
- Set<SelectionKey> keys = selector.selectedKeys();
+ throw ex;
+ }
+
+ // Update object instance for later cleanup.
+ this.selector = selector;
+
+ while (!isStopped()) {
+ int count = selector.select(10);
+
+ if (count == 0) {
+ continue;
+ }
- for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
- final SelectionKey key = i.next();
- if (key.isAcceptable()) {
- try {
- SocketChannel sc = chan.accept();
- if (sc != null) {
- if (isStopped() || getAcceptListener() == null) {
- sc.close();
+ Set<SelectionKey> keys = selector.selectedKeys();
+
+ for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
+ final SelectionKey key = i.next();
+ if (key.isAcceptable()) {
+ try {
+ SocketChannel sc = channel.accept();
+ if (sc != null) {
+ if (isStopped() || getAcceptListener() == null) {
+ sc.close();
+ } else {
+ if (useQueueForAccept) {
+ socketQueue.put(sc.socket());
} else {
- if (useQueueForAccept) {
- socketQueue.put(sc.socket());
- } else {
- handleSocket(sc.socket());
- }
+ handleSocket(sc.socket());
}
}
+ }
- } catch (SocketTimeoutException ste) {
- // expect this to happen
- } catch (Exception e) {
- e.printStackTrace();
- if (!isStopping()) {
- onAcceptError(e);
- } else if (!isStopped()) {
- LOG.warn("run()", e);
- onAcceptError(e);
- }
+ } catch (SocketTimeoutException ste) {
+ // expect this to happen
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (!isStopping()) {
+ onAcceptError(e);
+ } else if (!isStopped()) {
+ LOG.warn("run()", e);
+ onAcceptError(e);
}
}
- i.remove();
}
-
- }
- } catch (IOException ex) {
- if (selector != null) {
- try {
- selector.close();
- } catch (IOException ioe) {}
- selector = null;
+ i.remove();
}
}
- } else {
- while (!isStopped()) {
- Socket socket = null;
- try {
- socket = serverSocket.accept();
- if (socket != null) {
- if (isStopped() || getAcceptListener() == null) {
- socket.close();
+ } catch (IOException ex) {
+ if (!isStopping()) {
+ onAcceptError(ex);
+ } else if (!isStopped()) {
+ LOG.warn("run()", ex);
+ onAcceptError(ex);
+ }
+ }
+ }
+
+ private void doRunWithServerSocket(final ServerSocket serverSocket) {
+ while (!isStopped()) {
+ Socket socket = null;
+ try {
+ socket = serverSocket.accept();
+ if (socket != null) {
+ if (isStopped() || getAcceptListener() == null) {
+ socket.close();
+ } else {
+ if (useQueueForAccept) {
+ socketQueue.put(socket);
} else {
- if (useQueueForAccept) {
- socketQueue.put(socket);
- } else {
- handleSocket(socket);
- }
+ handleSocket(socket);
}
}
- } catch (SocketTimeoutException ste) {
- // expect this to happen
- } catch (Exception e) {
- if (!isStopping()) {
- onAcceptError(e);
- } else if (!isStopped()) {
- LOG.warn("run()", e);
- onAcceptError(e);
- }
+ }
+ } catch (SocketTimeoutException ste) {
+ // expect this to happen
+ } catch (Exception e) {
+ if (!isStopping()) {
+ onAcceptError(e);
+ } else if (!isStopped()) {
+ LOG.warn("run()", e);
+ onAcceptError(e);
}
}
}
@@ -472,19 +498,43 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
- if (selector != null) {
- selector.close();
- selector = null;
+ Exception firstFailure = null;
+
+ try {
+ final Selector selector = this.selector;
+ if (selector != null) {
+ this.selector = null;
+ selector.close();
+ }
+ } catch (Exception error) {
}
- if (serverSocket != null) {
- serverSocket.close();
- serverSocket = null;
+
+ try {
+ final ServerSocket serverSocket = this.serverSocket;
+ if (serverSocket != null) {
+ this.serverSocket = null;
+ serverSocket.close();
+ }
+ } catch (Exception error) {
+ firstFailure = error;
}
+
if (socketHandlerThread != null) {
socketHandlerThread.interrupt();
socketHandlerThread = null;
}
- super.doStop(stopper);
+
+ try {
+ super.doStop(stopper);
+ } catch (Exception error) {
+ if (firstFailure != null) {
+ firstFailure = error;
+ }
+ }
+
+ if (firstFailure != null) {
+ throw firstFailure;
+ }
}
@Override