You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/12/12 16:18:29 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6535

Repository: activemq
Updated Branches:
  refs/heads/master 29b4db5c3 -> 27238b2dd


https://issues.apache.org/jira/browse/AMQ-6535

Fixing the auto+nio+ssl transport so that the protocol detection task
will properly terminate on timeout and not continue to run. Also
lowered the default detection timeout to 15 seconds instead of 30
seconds to match the InactivityMonitor default.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/27238b2d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/27238b2d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/27238b2d

Branch: refs/heads/master
Commit: 27238b2dd790cea38d95da46b23ed141f5847782
Parents: 29b4db5
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon Dec 12 11:15:43 2016 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon Dec 12 11:17:19 2016 -0500

----------------------------------------------------------------------
 .../transport/auto/AutoTcpTransportServer.java  | 86 ++++++++++++++++----
 .../auto/nio/AutoNIOSSLTransportServer.java     | 33 ++++++--
 .../transport/nio/AutoInitNioSSLTransport.java  | 19 +----
 .../activemq/transport/nio/NIOSSLTransport.java | 18 ----
 4 files changed, 97 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/27238b2d/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
index 8eeb6ac..8b9a73f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
@@ -27,8 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -77,8 +75,10 @@ public class AutoTcpTransportServer extends TcpTransportServer {
 
     protected BrokerService brokerService;
 
+    protected final ThreadPoolExecutor newConnectionExecutor;
+    protected final ThreadPoolExecutor protocolDetectionExecutor;
     protected int maxConnectionThreadPoolSize = Integer.MAX_VALUE;
-    protected int protocolDetectionTimeOut = 30000;
+    protected int protocolDetectionTimeOut = 15000;
 
     private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
     private final ConcurrentMap<String, TransportFactory> transportFactories = new ConcurrentHashMap<String, TransportFactory>();
@@ -157,12 +157,21 @@ public class AutoTcpTransportServer extends TcpTransportServer {
 
         //Use an executor service here to handle new connections.  Setting the max number
         //of threads to the maximum number of connections the thread count isn't unbounded
-        service = new ThreadPoolExecutor(maxConnectionThreadPoolSize,
+        newConnectionExecutor = new ThreadPoolExecutor(maxConnectionThreadPoolSize,
                 maxConnectionThreadPoolSize,
                 30L, TimeUnit.SECONDS,
                 new LinkedBlockingQueue<Runnable>());
         //allow the thread pool to shrink if the max number of threads isn't needed
-        service.allowCoreThreadTimeOut(true);
+        //and the pool can grow and shrink as needed if contention is high
+        newConnectionExecutor.allowCoreThreadTimeOut(true);
+
+        //Executor for waiting for bytes to detection of protocol
+        protocolDetectionExecutor = new ThreadPoolExecutor(maxConnectionThreadPoolSize,
+                maxConnectionThreadPoolSize,
+                30L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>());
+        //allow the thread pool to shrink if the max number of threads isn't needed
+        protocolDetectionExecutor.allowCoreThreadTimeOut(true);
 
         this.brokerService = brokerService;
         this.enabledProtocols = enabledProtocols;
@@ -173,10 +182,32 @@ public class AutoTcpTransportServer extends TcpTransportServer {
         return maxConnectionThreadPoolSize;
     }
 
+    /**
+     * Set the number of threads to be used for processing connections.  Defaults
+     * to Integer.MAX_SIZE.  Set this value to be lower to reduce the
+     * number of simultaneous connection attempts.  If not set then the maximum number of
+     * threads will generally be controlled by the transport maxConnections setting:
+     * {@link TcpTransportServer#setMaximumConnections(int)}.
+     *<p>
+     * Note that this setter controls two thread pools because connection attempts
+     * require 1 thread to start processing the connection and another thread to read from the
+     * socket and to detect the protocol. Two threads are needed because some transports
+     * block on socket read so the first thread needs to be able to abort the second thread on timeout.
+     * Therefore this setting will set each thread pool to the size passed in essentially giving
+     * 2 times as many potential threads as the value set.
+     *<p>
+     * Both thread pools will close idle threads after a period of time
+     * essentially allowing the thread pools to grow and shrink dynamically based on load.
+     *
+     * @see {@link TcpTransportServer#setMaximumConnections(int)}.
+     * @param maxConnectionThreadPoolSize
+     */
     public void setMaxConnectionThreadPoolSize(int maxConnectionThreadPoolSize) {
         this.maxConnectionThreadPoolSize = maxConnectionThreadPoolSize;
-        service.setCorePoolSize(maxConnectionThreadPoolSize);
-        service.setMaximumPoolSize(maxConnectionThreadPoolSize);
+        newConnectionExecutor.setCorePoolSize(maxConnectionThreadPoolSize);
+        newConnectionExecutor.setMaximumPoolSize(maxConnectionThreadPoolSize);
+        protocolDetectionExecutor.setCorePoolSize(maxConnectionThreadPoolSize);
+        protocolDetectionExecutor.setMaximumPoolSize(maxConnectionThreadPoolSize);
     }
 
     public void setProtocolDetectionTimeOut(int protocolDetectionTimeOut) {
@@ -219,16 +250,13 @@ public class AutoTcpTransportServer extends TcpTransportServer {
         return enabledProtocols == null || enabledProtocols.isEmpty();
     }
 
-
-    protected final ThreadPoolExecutor service;
-
     @Override
     protected void handleSocket(final Socket socket) {
         final AutoTcpTransportServer server = this;
         //This needs to be done in a new thread because
         //the socket might be waiting on the client to send bytes
         //doHandleSocket can't complete until the protocol can be detected
-        service.submit(new Runnable() {
+        newConnectionExecutor.submit(new Runnable() {
             @Override
             public void run() {
                 server.doHandleSocket(socket);
@@ -239,30 +267,37 @@ public class AutoTcpTransportServer extends TcpTransportServer {
     @Override
     protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
         final InputStream is = socket.getInputStream();
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-
         final AtomicInteger readBytes = new AtomicInteger(0);
         final ByteBuffer data = ByteBuffer.allocate(8);
+
         // We need to peak at the first 8 bytes of the buffer to detect the protocol
-        Future<?> future = executor.submit(new Runnable() {
+        Future<?> future = protocolDetectionExecutor.submit(new Runnable() {
             @Override
             public void run() {
                 try {
                     do {
+                        //will block until enough bytes or read or a timeout
+                        //and the socket is closed
                         int read = is.read();
                         if (read == -1) {
                             throw new IOException("Connection failed, stream is closed.");
                         }
                         data.put((byte) read);
                         readBytes.incrementAndGet();
-                    } while (readBytes.get() < 8);
+                    } while (readBytes.get() < 8 && !Thread.interrupted());
                 } catch (Exception e) {
                     throw new IllegalStateException(e);
                 }
             }
         });
 
-        waitForProtocolDetectionFinish(future, readBytes);
+        try {
+            //If this fails and throws an exception and the socket will be closed
+            waitForProtocolDetectionFinish(future, readBytes);
+        } finally {
+            //call cancel in case task didn't complete
+            future.cancel(true);
+        }
         data.flip();
         ProtocolInfo protocolInfo = detectProtocol(data.array());
 
@@ -320,8 +355,23 @@ public class AutoTcpTransportServer extends TcpTransportServer {
     }
     @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
-        if (service != null) {
-            service.shutdown();
+        if (newConnectionExecutor != null) {
+            newConnectionExecutor.shutdownNow();
+            try {
+                if (!newConnectionExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
+                    LOG.warn("Auto Transport newConnectionExecutor didn't shutdown cleanly");
+                }
+            } catch (InterruptedException e) {
+            }
+        }
+        if (protocolDetectionExecutor != null) {
+            protocolDetectionExecutor.shutdownNow();
+            try {
+                if (!protocolDetectionExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
+                    LOG.warn("Auto Transport protocolDetectionExecutor didn't shutdown cleanly");
+                }
+            } catch (InterruptedException e) {
+            }
         }
         super.doStop(stopper);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/27238b2d/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
index cb38d7e..572352e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
@@ -7,8 +7,6 @@ import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import javax.net.ServerSocketFactory;
@@ -101,8 +99,6 @@ public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
 
     @Override
     protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-
         //The SSLEngine needs to be initialized and handshake done to get the first command and detect the format
         //The wireformat doesn't need properties set here because we aren't using this format during the SSL handshake
         final AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket);
@@ -117,17 +113,38 @@ public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
         in.start();
         SSLEngine engine = in.getSslSession();
 
-        Future<?> future = executor.submit(new Runnable() {
+        //Attempt to read enough bytes to detect the protocol until the timeout period
+        //is reached
+        Future<?> future = protocolDetectionExecutor.submit(new Runnable() {
             @Override
             public void run() {
-                //Wait for handshake to finish initializing
+                int attempts = 0;
                 do {
+                    if(attempts > 0) {
+                        try {
+                            //increase sleep period each attempt to prevent high cpu usage
+                            //if the client is hung and not sending bytes
+                            int sleep = attempts >= 1024 ? 1024 : 4 * attempts;
+                            Thread.sleep(sleep);
+                        } catch (InterruptedException e) {
+                            break;
+                        }
+                    }
+                    //In the future it might be better to register a nonblocking selector
+                    //to be told when bytes are ready
                     in.serviceRead();
-                } while(in.getReadSize().get() < 8);
+                    attempts++;
+                } while(in.getReadSize().get() < 8 && !Thread.interrupted());
             }
         });
 
-        waitForProtocolDetectionFinish(future, in.getReadSize());
+        try {
+            //If this fails and throws an exception and the socket will be closed
+            waitForProtocolDetectionFinish(future, in.getReadSize());
+        } finally {
+            //call cancel in case task didn't complete which will interrupt the task
+            future.cancel(true);
+        }
         in.stop();
 
         InitBuffer initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length));

http://git-wip-us.apache.org/repos/asf/activemq/blob/27238b2d/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
index 0bf4145..449c7ae 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
@@ -135,21 +135,6 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport {
 
     }
 
-
-    @Override
-    protected void finishHandshake() throws Exception {
-        if (handshakeInProgress) {
-            handshakeInProgress = false;
-            nextFrameSize = -1;
-
-            // Once handshake completes we need to ask for the now real sslSession
-            // otherwise the session would return 'SSL_NULL_WITH_NULL_NULL' for the
-            // cipher suite.
-            sslSession = sslEngine.getSession();
-
-        }
-    }
-
     public SSLEngine getSslSession() {
         return this.sslEngine;
     }
@@ -180,6 +165,10 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport {
                 if (!plain.hasRemaining()) {
                     int readCount = secureRead(plain);
 
+                    if (readCount == 0) {
+                        break;
+                    }
+
                     // channel is closed, cleanup
                     if (readCount == -1) {
                         onException(new EOFException());

http://git-wip-us.apache.org/repos/asf/activemq/blob/27238b2d/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
index 3bceb0e..64e96be 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
@@ -156,7 +156,6 @@ public class NIOSSLTransport extends NIOTransport {
                 doHandshake();
             }
 
-           // if (hasSslEngine) {
             selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
                 @Override
                 public void onSelect(SelectorSelection selection) {
@@ -233,23 +232,6 @@ public class NIOSSLTransport extends NIOTransport {
             // otherwise the session would return 'SSL_NULL_WITH_NULL_NULL' for the
             // cipher suite.
             sslSession = sslEngine.getSession();
-
-            // listen for events telling us when the socket is readable.
-            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
-                @Override
-                public void onSelect(SelectorSelection selection) {
-                    serviceRead();
-                }
-
-                @Override
-                public void onError(SelectorSelection selection, Throwable error) {
-                    if (error instanceof IOException) {
-                        onException((IOException) error);
-                    } else {
-                        onException(IOExceptionSupport.create(error));
-                    }
-                }
-            });
         }
     }