You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/05/13 15:29:36 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6291

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 3a4cdeb91 -> 52d2b1578


https://issues.apache.org/jira/browse/AMQ-6291

Better management of shared resources between the background run thread
and the main start / stop thread.  Makes sure to cleanup all resources
before finally throwing on stop to prevent leaking and resources.  
(cherry picked from commit ff99872263981982bb1ebce93c07bfb8a28d4a06)

Conflicts:
	activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52d2b157
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52d2b157
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52d2b157

Branch: refs/heads/activemq-5.13.x
Commit: 52d2b157808c4dd11fe39203e0caad44d4e64a86
Parents: 3a4cdeb
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 13 11:13:11 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 13 11:29:25 2016 -0400

----------------------------------------------------------------------
 .../transport/tcp/TcpTransportServer.java       | 194 ++++++++++++-------
 1 file changed, 127 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/52d2b157/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index 43e6ae0..a4ed0ae 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -26,10 +26,14 @@ import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -47,8 +51,6 @@ import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.TransportServerThreadSupport;
-import org.apache.activemq.transport.nio.SelectorManager;
-import org.apache.activemq.transport.nio.SelectorSelection;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.InetAddressUtil;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -66,8 +68,8 @@ import org.slf4j.LoggerFactory;
 public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
-    protected ServerSocket serverSocket;
-    protected SelectorSelection selector;
+    protected volatile ServerSocket serverSocket;
+    protected volatile Selector selector;
     protected int backlog = 5000;
     protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
     protected final TcpTransportFactory transportFactory;
@@ -110,14 +112,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
      */
     protected boolean startLogging = true;
     protected final ServerSocketFactory serverSocketFactory;
-    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
+    protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
     protected Thread socketHandlerThread;
 
     /**
      * The maximum number of sockets allowed for this server
      */
     protected int maximumConnections = Integer.MAX_VALUE;
-    protected AtomicInteger currentTransportCount = new AtomicInteger();
+    protected final AtomicInteger currentTransportCount = new AtomicInteger();
 
     public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
         URISyntaxException {
@@ -134,8 +136,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
         InetAddress addr = InetAddress.getByName(host);
 
         try {
-            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
-            configureServerSocket(this.serverSocket);
+            serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
+            configureServerSocket(serverSocket);
         } catch (IOException e) {
             throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
         }
@@ -143,7 +145,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
             setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
                 bind.getQuery(), bind.getFragment()));
         } catch (URISyntaxException e) {
-
             // it could be that the host name contains invalid characters such
             // as _ on unix platforms so lets try use the IP address instead
             try {
@@ -299,15 +300,52 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
      */
     @Override
     public void run() {
-        final ServerSocketChannel chan = serverSocket.getChannel();
-        if (chan != null) {
+        if (!isStopped() && !isStopping()) {
+            final ServerSocket serverSocket = this.serverSocket;
+            if (serverSocket == null) {
+                onAcceptError(new IOException("Server started without a valid ServerSocket"));
+            }
+
+            final ServerSocketChannel channel = serverSocket.getChannel();
+            if (channel != null) {
+                doRunWithServerSocketChannel(channel);
+            } else {
+                doRunWithServerSocket(serverSocket);
+            }
+        }
+    }
+
+    private void doRunWithServerSocketChannel(final ServerSocketChannel channel) {
+        try {
+            channel.configureBlocking(false);
+            final Selector selector = Selector.open();
+
             try {
-                chan.configureBlocking(false);
-                selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() {
-                    @Override
-                    public void onSelect(SelectorSelection sel) {
+                channel.register(selector, SelectionKey.OP_ACCEPT);
+            } catch (ClosedChannelException ex) {
+                try {
+                    selector.close();
+                } catch (IOException ignore) {}
+
+                throw ex;
+            }
+
+            // Update object instance for later cleanup.
+            this.selector = selector;
+
+            while (!isStopped()) {
+                int count = selector.select(10);
+                if (count == 0) {
+                    continue;
+                }
+
+                Set<SelectionKey> keys = selector.selectedKeys();
+
+                for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
+                    final SelectionKey key = i.next();
+                    if (key.isAcceptable()) {
                         try {
-                            SocketChannel sc = chan.accept();
+                            SocketChannel sc = channel.accept();
                             if (sc != null) {
                                 if (isStopped() || getAcceptListener() == null) {
                                     sc.close();
@@ -319,56 +357,55 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
                                     }
                                 }
                             }
+                        } catch (SocketTimeoutException ste) {
+                            // expect this to happen
                         } catch (Exception e) {
-                            onError(sel, e);
-                        }
-                    }
-                    @Override
-                    public void onError(SelectorSelection sel, Throwable error) {
-                        Exception e = null;
-                        if (error instanceof Exception) {
-                            e = (Exception)error;
-                        } else {
-                            e = new Exception(error);
-                        }
-                        if (!isStopping()) {
-                            onAcceptError(e);
-                        } else if (!isStopped()) {
-                            LOG.warn("run()", e);
-                            onAcceptError(e);
+                            e.printStackTrace();
+                            if (!isStopping()) {
+                                onAcceptError(e);
+                            } else if (!isStopped()) {
+                                LOG.warn("run()", e);
+                                onAcceptError(e);
+                            }
                         }
                     }
-                });
-                selector.setInterestOps(SelectionKey.OP_ACCEPT);
-                selector.enable();
-            } catch (IOException ex) {
-                selector = null;
+                    i.remove();
+                }
             }
-        } else {
-            while (!isStopped()) {
-                Socket socket = null;
-                try {
-                    socket = serverSocket.accept();
-                    if (socket != null) {
-                        if (isStopped() || getAcceptListener() == null) {
-                            socket.close();
+        } catch (IOException ex) {
+            if (!isStopping()) {
+                onAcceptError(ex);
+            } else if (!isStopped()) {
+                LOG.warn("run()", ex);
+                onAcceptError(ex);
+            }
+        }
+    }
+
+    private void doRunWithServerSocket(final ServerSocket serverSocket) {
+        while (!isStopped()) {
+            Socket socket = null;
+            try {
+                socket = serverSocket.accept();
+                if (socket != null) {
+                    if (isStopped() || getAcceptListener() == null) {
+                        socket.close();
+                    } else {
+                        if (useQueueForAccept) {
+                            socketQueue.put(socket);
                         } else {
-                            if (useQueueForAccept) {
-                                socketQueue.put(socket);
-                            } else {
-                                handleSocket(socket);
-                            }
+                            handleSocket(socket);
                         }
                     }
-                } catch (SocketTimeoutException ste) {
-                    // expect this to happen
-                } catch (Exception e) {
-                    if (!isStopping()) {
-                        onAcceptError(e);
-                    } else if (!isStopped()) {
-                        LOG.warn("run()", e);
-                        onAcceptError(e);
-                    }
+                }
+            } catch (SocketTimeoutException ste) {
+                // expect this to happen
+            } catch (Exception e) {
+                if (!isStopping()) {
+                    onAcceptError(e);
+                } else if (!isStopped()) {
+                    LOG.warn("run()", e);
+                    onAcceptError(e);
                 }
             }
         }
@@ -458,20 +495,43 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
 
     @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
-        if (selector != null) {
-            selector.disable();
-            selector.close();
-            selector = null;
+
+        Exception firstFailure = null;
+
+        try {
+            if (selector != null) {
+                selector.close();
+                selector = null;
+            }
+        } catch (Exception error) {
         }
-        if (serverSocket != null) {
-            serverSocket.close();
-            serverSocket = null;
+
+        try {
+            final ServerSocket serverSocket = this.serverSocket;
+            if (serverSocket != null) {
+                this.serverSocket = null;
+                serverSocket.close();
+            }
+        } catch (Exception error) {
+            firstFailure = error;
         }
+
         if (socketHandlerThread != null) {
             socketHandlerThread.interrupt();
             socketHandlerThread = null;
         }
-        super.doStop(stopper);
+
+        try {
+            super.doStop(stopper);
+        } catch (Exception error) {
+            if (firstFailure != null) {
+                firstFailure = error;
+            }
+        }
+
+        if (firstFailure != null) {
+            throw firstFailure;
+        }
     }
 
     @Override