You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/05/13 15:29:36 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6291
Repository: activemq
Updated Branches:
refs/heads/activemq-5.13.x 3a4cdeb91 -> 52d2b1578
https://issues.apache.org/jira/browse/AMQ-6291
Better management of shared resources between the background run thread
and the main start / stop thread. Makes sure to cleanup all resources
before finally throwing on stop to prevent leaking and resources.
(cherry picked from commit ff99872263981982bb1ebce93c07bfb8a28d4a06)
Conflicts:
activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52d2b157
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52d2b157
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52d2b157
Branch: refs/heads/activemq-5.13.x
Commit: 52d2b157808c4dd11fe39203e0caad44d4e64a86
Parents: 3a4cdeb
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 13 11:13:11 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 13 11:29:25 2016 -0400
----------------------------------------------------------------------
.../transport/tcp/TcpTransportServer.java | 194 ++++++++++++-------
1 file changed, 127 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/52d2b157/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index 43e6ae0..a4ed0ae 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -26,10 +26,14 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -47,8 +51,6 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport;
-import org.apache.activemq.transport.nio.SelectorManager;
-import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.IntrospectionSupport;
@@ -66,8 +68,8 @@ import org.slf4j.LoggerFactory;
public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
- protected ServerSocket serverSocket;
- protected SelectorSelection selector;
+ protected volatile ServerSocket serverSocket;
+ protected volatile Selector selector;
protected int backlog = 5000;
protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
protected final TcpTransportFactory transportFactory;
@@ -110,14 +112,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
*/
protected boolean startLogging = true;
protected final ServerSocketFactory serverSocketFactory;
- protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
+ protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
protected Thread socketHandlerThread;
/**
* The maximum number of sockets allowed for this server
*/
protected int maximumConnections = Integer.MAX_VALUE;
- protected AtomicInteger currentTransportCount = new AtomicInteger();
+ protected final AtomicInteger currentTransportCount = new AtomicInteger();
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
URISyntaxException {
@@ -134,8 +136,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
InetAddress addr = InetAddress.getByName(host);
try {
- this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
- configureServerSocket(this.serverSocket);
+ serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
+ configureServerSocket(serverSocket);
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
}
@@ -143,7 +145,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
bind.getQuery(), bind.getFragment()));
} catch (URISyntaxException e) {
-
// it could be that the host name contains invalid characters such
// as _ on unix platforms so lets try use the IP address instead
try {
@@ -299,15 +300,52 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
*/
@Override
public void run() {
- final ServerSocketChannel chan = serverSocket.getChannel();
- if (chan != null) {
+ if (!isStopped() && !isStopping()) {
+ final ServerSocket serverSocket = this.serverSocket;
+ if (serverSocket == null) {
+ onAcceptError(new IOException("Server started without a valid ServerSocket"));
+ }
+
+ final ServerSocketChannel channel = serverSocket.getChannel();
+ if (channel != null) {
+ doRunWithServerSocketChannel(channel);
+ } else {
+ doRunWithServerSocket(serverSocket);
+ }
+ }
+ }
+
+ private void doRunWithServerSocketChannel(final ServerSocketChannel channel) {
+ try {
+ channel.configureBlocking(false);
+ final Selector selector = Selector.open();
+
try {
- chan.configureBlocking(false);
- selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() {
- @Override
- public void onSelect(SelectorSelection sel) {
+ channel.register(selector, SelectionKey.OP_ACCEPT);
+ } catch (ClosedChannelException ex) {
+ try {
+ selector.close();
+ } catch (IOException ignore) {}
+
+ throw ex;
+ }
+
+ // Update object instance for later cleanup.
+ this.selector = selector;
+
+ while (!isStopped()) {
+ int count = selector.select(10);
+ if (count == 0) {
+ continue;
+ }
+
+ Set<SelectionKey> keys = selector.selectedKeys();
+
+ for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
+ final SelectionKey key = i.next();
+ if (key.isAcceptable()) {
try {
- SocketChannel sc = chan.accept();
+ SocketChannel sc = channel.accept();
if (sc != null) {
if (isStopped() || getAcceptListener() == null) {
sc.close();
@@ -319,56 +357,55 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
}
}
}
+ } catch (SocketTimeoutException ste) {
+ // expect this to happen
} catch (Exception e) {
- onError(sel, e);
- }
- }
- @Override
- public void onError(SelectorSelection sel, Throwable error) {
- Exception e = null;
- if (error instanceof Exception) {
- e = (Exception)error;
- } else {
- e = new Exception(error);
- }
- if (!isStopping()) {
- onAcceptError(e);
- } else if (!isStopped()) {
- LOG.warn("run()", e);
- onAcceptError(e);
+ e.printStackTrace();
+ if (!isStopping()) {
+ onAcceptError(e);
+ } else if (!isStopped()) {
+ LOG.warn("run()", e);
+ onAcceptError(e);
+ }
}
}
- });
- selector.setInterestOps(SelectionKey.OP_ACCEPT);
- selector.enable();
- } catch (IOException ex) {
- selector = null;
+ i.remove();
+ }
}
- } else {
- while (!isStopped()) {
- Socket socket = null;
- try {
- socket = serverSocket.accept();
- if (socket != null) {
- if (isStopped() || getAcceptListener() == null) {
- socket.close();
+ } catch (IOException ex) {
+ if (!isStopping()) {
+ onAcceptError(ex);
+ } else if (!isStopped()) {
+ LOG.warn("run()", ex);
+ onAcceptError(ex);
+ }
+ }
+ }
+
+ private void doRunWithServerSocket(final ServerSocket serverSocket) {
+ while (!isStopped()) {
+ Socket socket = null;
+ try {
+ socket = serverSocket.accept();
+ if (socket != null) {
+ if (isStopped() || getAcceptListener() == null) {
+ socket.close();
+ } else {
+ if (useQueueForAccept) {
+ socketQueue.put(socket);
} else {
- if (useQueueForAccept) {
- socketQueue.put(socket);
- } else {
- handleSocket(socket);
- }
+ handleSocket(socket);
}
}
- } catch (SocketTimeoutException ste) {
- // expect this to happen
- } catch (Exception e) {
- if (!isStopping()) {
- onAcceptError(e);
- } else if (!isStopped()) {
- LOG.warn("run()", e);
- onAcceptError(e);
- }
+ }
+ } catch (SocketTimeoutException ste) {
+ // expect this to happen
+ } catch (Exception e) {
+ if (!isStopping()) {
+ onAcceptError(e);
+ } else if (!isStopped()) {
+ LOG.warn("run()", e);
+ onAcceptError(e);
}
}
}
@@ -458,20 +495,43 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
- if (selector != null) {
- selector.disable();
- selector.close();
- selector = null;
+
+ Exception firstFailure = null;
+
+ try {
+ if (selector != null) {
+ selector.close();
+ selector = null;
+ }
+ } catch (Exception error) {
}
- if (serverSocket != null) {
- serverSocket.close();
- serverSocket = null;
+
+ try {
+ final ServerSocket serverSocket = this.serverSocket;
+ if (serverSocket != null) {
+ this.serverSocket = null;
+ serverSocket.close();
+ }
+ } catch (Exception error) {
+ firstFailure = error;
}
+
if (socketHandlerThread != null) {
socketHandlerThread.interrupt();
socketHandlerThread = null;
}
- super.doStop(stopper);
+
+ try {
+ super.doStop(stopper);
+ } catch (Exception error) {
+ if (firstFailure != null) {
+ firstFailure = error;
+ }
+ }
+
+ if (firstFailure != null) {
+ throw firstFailure;
+ }
}
@Override