You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/05/13 15:13:34 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6209

Repository: activemq
Updated Branches:
  refs/heads/master d7b5a62bb -> ff9987226


https://issues.apache.org/jira/browse/AMQ-6209

Better management of shared resources between the background run thread
and the main start / stop thread.  Makes sure to cleanup all resources
before finally throwing on stop to prevent leaking and resources.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ff998722
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ff998722
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ff998722

Branch: refs/heads/master
Commit: ff99872263981982bb1ebce93c07bfb8a28d4a06
Parents: d7b5a62
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 13 11:13:11 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 13 11:13:28 2016 -0400

----------------------------------------------------------------------
 .../transport/tcp/TcpTransportServer.java       | 214 ++++++++++++-------
 1 file changed, 132 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ff998722/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index f071522..33d9c72 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -26,6 +26,7 @@ import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
@@ -50,8 +51,6 @@ import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.TransportServerThreadSupport;
-import org.apache.activemq.transport.nio.SelectorManager;
-import org.apache.activemq.transport.nio.SelectorSelection;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.InetAddressUtil;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -69,8 +68,9 @@ import org.slf4j.LoggerFactory;
 public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
-    protected ServerSocket serverSocket;
-    protected Selector selector;
+
+    protected volatile ServerSocket serverSocket;
+    protected volatile Selector selector;
     protected int backlog = 5000;
     protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
     protected final TcpTransportFactory transportFactory;
@@ -113,14 +113,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
      */
     protected boolean startLogging = true;
     protected final ServerSocketFactory serverSocketFactory;
-    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
+    protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
     protected Thread socketHandlerThread;
 
     /**
      * The maximum number of sockets allowed for this server
      */
     protected int maximumConnections = Integer.MAX_VALUE;
-    protected AtomicInteger currentTransportCount = new AtomicInteger();
+    protected final AtomicInteger currentTransportCount = new AtomicInteger();
 
     public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
         URISyntaxException {
@@ -137,8 +137,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
         InetAddress addr = InetAddress.getByName(host);
 
         try {
-            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
-            configureServerSocket(this.serverSocket);
+            serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
+            configureServerSocket(serverSocket);
         } catch (IOException e) {
             throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
         }
@@ -146,7 +146,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
             setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
                 bind.getQuery(), bind.getFragment()));
         } catch (URISyntaxException e) {
-
             // it could be that the host name contains invalid characters such
             // as _ on unix platforms so lets try use the IP address instead
             try {
@@ -302,87 +301,114 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
      */
     @Override
     public void run() {
-        final ServerSocketChannel chan = serverSocket.getChannel();
-        if (chan != null) {
+        if (!isStopped() && !isStopping()) {
+            final ServerSocket serverSocket = this.serverSocket;
+            if (serverSocket == null) {
+                onAcceptError(new IOException("Server started without a valid ServerSocket"));
+            }
+
+            final ServerSocketChannel channel = serverSocket.getChannel();
+            if (channel != null) {
+                doRunWithServerSocketChannel(channel);
+            } else {
+                doRunWithServerSocket(serverSocket);
+            }
+        }
+    }
+
+    private void doRunWithServerSocketChannel(final ServerSocketChannel channel) {
+        try {
+            channel.configureBlocking(false);
+            final Selector selector = Selector.open();
+
             try {
-                chan.configureBlocking(false);
-                selector = Selector.open();
-                chan.register(selector, SelectionKey.OP_ACCEPT);
-                while (!isStopped()) {
-                    int count = selector.select(10);
-
-                    if (count == 0) {
-                        continue;
-                    }
+                channel.register(selector, SelectionKey.OP_ACCEPT);
+            } catch (ClosedChannelException ex) {
+                try {
+                    selector.close();
+                } catch (IOException ignore) {}
 
-                    Set<SelectionKey> keys = selector.selectedKeys();
+                throw ex;
+            }
+
+            // Update object instance for later cleanup.
+            this.selector = selector;
+
+            while (!isStopped()) {
+                int count = selector.select(10);
+
+                if (count == 0) {
+                    continue;
+                }
 
-                    for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
-                        final SelectionKey key = i.next();
-                        if (key.isAcceptable()) {
-                            try {
-                                SocketChannel sc = chan.accept();
-                                if (sc != null) {
-                                    if (isStopped() || getAcceptListener() == null) {
-                                        sc.close();
+                Set<SelectionKey> keys = selector.selectedKeys();
+
+                for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
+                    final SelectionKey key = i.next();
+                    if (key.isAcceptable()) {
+                        try {
+                            SocketChannel sc = channel.accept();
+                            if (sc != null) {
+                                if (isStopped() || getAcceptListener() == null) {
+                                    sc.close();
+                                } else {
+                                    if (useQueueForAccept) {
+                                        socketQueue.put(sc.socket());
                                     } else {
-                                        if (useQueueForAccept) {
-                                            socketQueue.put(sc.socket());
-                                        } else {
-                                            handleSocket(sc.socket());
-                                        }
+                                        handleSocket(sc.socket());
                                     }
                                 }
+                            }
 
-                            } catch (SocketTimeoutException ste) {
-                                // expect this to happen
-                            } catch (Exception e) {
-                                e.printStackTrace();
-                                if (!isStopping()) {
-                                    onAcceptError(e);
-                                } else if (!isStopped()) {
-                                    LOG.warn("run()", e);
-                                    onAcceptError(e);
-                                }
+                        } catch (SocketTimeoutException ste) {
+                            // expect this to happen
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            if (!isStopping()) {
+                                onAcceptError(e);
+                            } else if (!isStopped()) {
+                                LOG.warn("run()", e);
+                                onAcceptError(e);
                             }
                         }
-                        i.remove();
                     }
-
-                }
-            } catch (IOException ex) {
-                if (selector != null) {
-                    try {
-                        selector.close();
-                    } catch (IOException ioe) {}
-                    selector = null;
+                    i.remove();
                 }
             }
-        } else {
-            while (!isStopped()) {
-                Socket socket = null;
-                try {
-                    socket = serverSocket.accept();
-                    if (socket != null) {
-                        if (isStopped() || getAcceptListener() == null) {
-                            socket.close();
+        } catch (IOException ex) {
+            if (!isStopping()) {
+                onAcceptError(ex);
+            } else if (!isStopped()) {
+                LOG.warn("run()", ex);
+                onAcceptError(ex);
+            }
+        }
+    }
+
+    private void doRunWithServerSocket(final ServerSocket serverSocket) {
+        while (!isStopped()) {
+            Socket socket = null;
+            try {
+                socket = serverSocket.accept();
+                if (socket != null) {
+                    if (isStopped() || getAcceptListener() == null) {
+                        socket.close();
+                    } else {
+                        if (useQueueForAccept) {
+                            socketQueue.put(socket);
                         } else {
-                            if (useQueueForAccept) {
-                                socketQueue.put(socket);
-                            } else {
-                                handleSocket(socket);
-                            }
+                            handleSocket(socket);
                         }
                     }
-                } catch (SocketTimeoutException ste) {
-                    // expect this to happen
-                } catch (Exception e) {
-                    if (!isStopping()) {
-                        onAcceptError(e);
-                    } else if (!isStopped()) {
-                        LOG.warn("run()", e);
-                        onAcceptError(e);
-                    }
+                }
+            } catch (SocketTimeoutException ste) {
+                // expect this to happen
+            } catch (Exception e) {
+                if (!isStopping()) {
+                    onAcceptError(e);
+                } else if (!isStopped()) {
+                    LOG.warn("run()", e);
+                    onAcceptError(e);
                 }
             }
         }
@@ -472,19 +498,43 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
 
     @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
-        if (selector != null) {
-            selector.close();
-            selector = null;
+        Exception firstFailure = null;
+
+        try {
+            final Selector selector = this.selector;
+            if (selector != null) {
+                this.selector = null;
+                selector.close();
+            }
+        } catch (Exception error) {
         }
-        if (serverSocket != null) {
-            serverSocket.close();
-            serverSocket = null;
+
+        try {
+            final ServerSocket serverSocket = this.serverSocket;
+            if (serverSocket != null) {
+                this.serverSocket = null;
+                serverSocket.close();
+            }
+        } catch (Exception error) {
+            firstFailure = error;
         }
+
         if (socketHandlerThread != null) {
             socketHandlerThread.interrupt();
             socketHandlerThread = null;
         }
-        super.doStop(stopper);
+
+        try {
+            super.doStop(stopper);
+        } catch (Exception error) {
+            if (firstFailure != null) {
+                firstFailure = error;
+            }
+        }
+
+        if (firstFailure != null) {
+            throw firstFailure;
+        }
     }
 
     @Override