You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2022/02/18 17:47:33 UTC

[tomcat] branch 8.5.x updated: Align with 9.0.x onwards - address BZ 65408 and BZ 65755

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/8.5.x by this push:
     new 01f2cf2  Align with 9.0.x onwards - address BZ 65408 and BZ 65755
01f2cf2 is described below

commit 01f2cf25b270a84d0daeefc4f215aa2f56e1df99
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Fri Feb 18 17:47:22 2022 +0000

    Align with 9.0.x onwards - address BZ 65408 and BZ 65755
    
    Things reached the point where the remaining patches to back-port were
    so intertwined I was having difficulty unpicking them. I also needed to
    back-port a few additional changes I had been trying not to back-port
    because of due to backwards compatibility issues for users extending the
    Endpoint classes.
    
    If users extending (or possibly using) Endpoint classess experience
    compatibility issues, then my plan is to retrofit fixes for any specific
    issues observed.
    
    The changes covered by this commit include:
    - Refactoring to introduce a common Acceptor class.
    - Pulling up the Socket/SocketWrapper map to the AbstractEndpoint
    - Refactoring socket close so it is controlled by the SocketWrapper
    - Completing the introduction of using dummy channels, buffers etc once
      the socket has been closed
    
    The tests currently all pass - hence this commit - but there is still a
    little more work to do for i18n, reviewing diffs and back-porting a few
    additional features like portOffset and graceful close.
    
    https://bz.apache.org/bugzilla/show_bug.cgi?id=65408
    https://bz.apache.org/bugzilla/show_bug.cgi?id=65755
---
 .../catalina/security/SecurityClassLoad.java       |   3 -
 java/org/apache/coyote/AbstractProtocol.java       |  38 +-
 .../apache/tomcat/util/net/AbstractEndpoint.java   | 277 ++++++-----
 java/org/apache/tomcat/util/net/Acceptor.java      | 252 ++++++++++
 java/org/apache/tomcat/util/net/AprEndpoint.java   | 360 +++++--------
 java/org/apache/tomcat/util/net/Nio2Channel.java   |  12 +-
 java/org/apache/tomcat/util/net/Nio2Endpoint.java  | 348 ++++++-------
 .../tomcat/util/net/NioBlockingSelector.java       | 542 --------------------
 java/org/apache/tomcat/util/net/NioChannel.java    |  33 +-
 java/org/apache/tomcat/util/net/NioEndpoint.java   | 554 +++++++++------------
 .../apache/tomcat/util/net/NioSelectorPool.java    | 378 --------------
 .../apache/tomcat/util/net/SecureNio2Channel.java  |  18 +-
 .../apache/tomcat/util/net/SecureNioChannel.java   |  37 +-
 .../apache/tomcat/util/net/SocketWrapperBase.java  |  26 +-
 .../server/WsRemoteEndpointImplServer.java         | 109 ++--
 15 files changed, 1088 insertions(+), 1899 deletions(-)

diff --git a/java/org/apache/catalina/security/SecurityClassLoad.java b/java/org/apache/catalina/security/SecurityClassLoad.java
index e24aae1..5def1f0 100644
--- a/java/org/apache/catalina/security/SecurityClassLoad.java
+++ b/java/org/apache/catalina/security/SecurityClassLoad.java
@@ -183,9 +183,6 @@ public final class SecurityClassLoad {
         // net
         loader.loadClass(basePackage + "util.net.Constants");
         loader.loadClass(basePackage + "util.net.DispatchType");
-        loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableAdd");
-        loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableCancel");
-        loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableRemove");
         loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$AprOperationState");
         loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$NioOperationState");
         loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$Nio2OperationState");
diff --git a/java/org/apache/coyote/AbstractProtocol.java b/java/org/apache/coyote/AbstractProtocol.java
index d3f83a7..5b10150 100644
--- a/java/org/apache/coyote/AbstractProtocol.java
+++ b/java/org/apache/coyote/AbstractProtocol.java
@@ -19,7 +19,7 @@ package org.apache.coyote;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.Map;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -717,7 +717,6 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
         private final AbstractProtocol<S> proto;
         private final RequestGroupInfo global = new RequestGroupInfo();
         private final AtomicLong registerCount = new AtomicLong(0);
-        private final Map<S,Processor> connections = new ConcurrentHashMap<>();
         private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
 
         public ConnectionHandler(AbstractProtocol<S> proto) {
@@ -756,7 +755,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
 
             S socket = wrapper.getSocket();
 
-            Processor processor = connections.get(socket);
+            Processor processor = (Processor) wrapper.getCurrentProcessor();
             if (getLog().isDebugEnabled()) {
                 getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
                         processor, socket));
@@ -840,7 +839,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                         wrapper.getSslSupport(getProtocol().getClientCertProvider()));
 
                 // Associate the processor with the connection
-                connections.put(socket, processor);
+                wrapper.setCurrentProcessor(processor);
 
                 SocketState state = SocketState.CLOSED;
                 do {
@@ -862,7 +861,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                                 // Create the upgrade processor
                                 processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
                                 // Associate with the processor with the connection
-                                connections.put(socket, processor);
+                                wrapper.setCurrentProcessor(processor);
                             } else {
                                 if (getLog().isDebugEnabled()) {
                                     getLog().debug(sm.getString(
@@ -883,7 +882,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                                         processor, wrapper));
                             }
                             // Associate with the processor with the connection
-                            connections.put(socket, processor);
+                            wrapper.setCurrentProcessor(processor);
                             // Initialise the upgrade handler (which may trigger
                             // some IO using the new protocol which is why the lines
                             // above are necessary)
@@ -915,7 +914,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                 } else if (state == SocketState.OPEN) {
                     // In keep-alive but between requests. OK to recycle
                     // processor. Continue to poll for the next request.
-                    connections.remove(socket);
+                    wrapper.setCurrentProcessor(null);
                     release(processor);
                     wrapper.registerReadInterest();
                 } else if (state == SocketState.SENDFILE) {
@@ -941,7 +940,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                     // Connection closed. OK to recycle the processor.
                     // Processors handling upgrades require additional clean-up
                     // before release.
-                    connections.remove(socket);
+                    wrapper.setCurrentProcessor(null);
                     if (processor.isUpgrade()) {
                         UpgradeToken upgradeToken = processor.getUpgradeToken();
                         HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
@@ -999,7 +998,7 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
 
             // Make sure socket/processor is removed from the list of current
             // connections
-            connections.remove(socket);
+            wrapper.setCurrentProcessor(null);
             release(processor);
             return SocketState.CLOSED;
         }
@@ -1019,7 +1018,15 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
 
         @Override
         public Set<S> getOpenSockets() {
-            return connections.keySet();
+            Set<SocketWrapperBase<S>> set = proto.getEndpoint().getConnections();
+            Set<S> result = new HashSet<>();
+            for (SocketWrapperBase<S> socketWrapper : set) {
+                S socket = socketWrapper.getSocket();
+                if (socket != null) {
+                    result.add(socket);
+                }
+            }
+            return result;
         }
 
 
@@ -1063,8 +1070,8 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
          */
         @Override
         public void release(SocketWrapperBase<S> socketWrapper) {
-            S socket = socketWrapper.getSocket();
-            Processor processor = connections.remove(socket);
+            Processor processor = (Processor) socketWrapper.getCurrentProcessor();
+            socketWrapper.setCurrentProcessor(null);
             release(processor);
         }
 
@@ -1132,8 +1139,11 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
              * Note that even if the endpoint is resumed, there is (currently)
              * no API to inform the Processors of this.
              */
-            for (Processor processor : connections.values()) {
-                processor.pause();
+            for (SocketWrapperBase<S> wrapper : proto.getEndpoint().getConnections()) {
+                Processor processor = (Processor) wrapper.getCurrentProcessor();
+                if (processor != null) {
+                    processor.pause();
+                }
             }
         }
     }
diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java b/java/org/apache/tomcat/util/net/AbstractEndpoint.java
index 18b7f94..a580211 100644
--- a/java/org/apache/tomcat/util/net/AbstractEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java
@@ -25,8 +25,10 @@ import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -42,7 +44,7 @@ import org.apache.tomcat.util.ExceptionUtils;
 import org.apache.tomcat.util.IntrospectionUtils;
 import org.apache.tomcat.util.collections.SynchronizedStack;
 import org.apache.tomcat.util.modeler.Registry;
-import org.apache.tomcat.util.net.AbstractEndpoint.Acceptor.AcceptorState;
+import org.apache.tomcat.util.net.Acceptor.AcceptorState;
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.util.threads.LimitLatch;
 import org.apache.tomcat.util.threads.ResizableExecutor;
@@ -102,7 +104,10 @@ public abstract class AbstractEndpoint<S,U> {
          *
          * @return The sockets for which the handler is tracking a currently
          *         open connection
+         * @deprecated Unused, will be removed in Tomcat 10, replaced
+         *         by AbstractEndpoint.getConnections
          */
+        @Deprecated
         public Set<S> getOpenSockets();
 
         /**
@@ -129,32 +134,12 @@ public abstract class AbstractEndpoint<S,U> {
     }
 
     protected enum BindState {
-        UNBOUND, BOUND_ON_INIT, BOUND_ON_START, SOCKET_CLOSED_ON_STOP
+        UNBOUND,
+        BOUND_ON_INIT,
+        BOUND_ON_START,
+        SOCKET_CLOSED_ON_STOP
     }
 
-    public abstract static class Acceptor implements Runnable {
-        public enum AcceptorState {
-            NEW, RUNNING, PAUSED, ENDED
-        }
-
-        protected volatile AcceptorState state = AcceptorState.NEW;
-        public final AcceptorState getState() {
-            return state;
-        }
-
-        private String threadName;
-        protected final void setThreadName(final String threadName) {
-            this.threadName = threadName;
-        }
-        protected final String getThreadName() {
-            return threadName;
-        }
-    }
-
-
-    private static final int INITIAL_ERROR_DELAY = 50;
-    private static final int MAX_ERROR_DELAY = 1600;
-
 
     public static long toTimeout(long timeout) {
         // Many calls can't do infinite timeout so use Long.MAX_VALUE if timeout is <= 0
@@ -188,16 +173,15 @@ public abstract class AbstractEndpoint<S,U> {
     /**
      * Socket properties
      */
-    protected SocketProperties socketProperties = new SocketProperties();
+    protected final SocketProperties socketProperties = new SocketProperties();
     public SocketProperties getSocketProperties() {
         return socketProperties;
     }
 
     /**
      * Thread used to accept new connections and pass them to worker threads.
-     * This is hard-coded to use a single acceptor.
      */
-    protected Acceptor[] acceptors;
+    protected Acceptor<U> acceptor;
 
     /**
      * Cache for SocketProcessor objects
@@ -206,6 +190,19 @@ public abstract class AbstractEndpoint<S,U> {
 
     private ObjectName oname = null;
 
+    /**
+     * Map holding all current connections keyed with the sockets.
+     */
+    protected Map<U, SocketWrapperBase<S>> connections = new ConcurrentHashMap<>();
+
+    /**
+     * Get a set with the current open connections.
+     * @return A set with the open socket wrappers
+     */
+    public Set<SocketWrapperBase<S>> getConnections() {
+        return new HashSet<>(connections.values());
+    }
+
     // ----------------------------------------------------------------- Properties
 
     private String defaultSSLHostConfigName = SSLHostConfig.DEFAULT_SSL_HOST_NAME;
@@ -494,7 +491,7 @@ public abstract class AbstractEndpoint<S,U> {
     public int getAcceptorThreadPriority() { return acceptorThreadPriority; }
 
 
-    private int maxConnections = 10000;
+    private int maxConnections = 8*1024;
     public void setMaxConnections(int maxCon) {
         this.maxConnections = maxCon;
         LimitLatch latch = this.connectionLimitLatch;
@@ -982,17 +979,11 @@ public abstract class AbstractEndpoint<S,U> {
     }
 
     /**
-     * Unlock the server socket accept using a bogus connection.
+     * Unlock the server socket acceptor threads using bogus connections.
      */
     protected void unlockAccept() {
         // Only try to unlock the acceptor if it is necessary
-        int unlocksRequired = 0;
-        for (Acceptor acceptor : acceptors) {
-            if (acceptor.getState() == AcceptorState.RUNNING) {
-                unlocksRequired++;
-            }
-        }
-        if (unlocksRequired == 0) {
+        if (acceptor == null || acceptor.getState() != AcceptorState.RUNNING) {
             return;
         }
 
@@ -1011,49 +1002,36 @@ public abstract class AbstractEndpoint<S,U> {
         try {
             unlockAddress = getUnlockAddress(localAddress);
 
-            for (int i = 0; i < unlocksRequired; i++) {
-                try (java.net.Socket s = new java.net.Socket()) {
-                    int stmo = 2 * 1000;
-                    int utmo = 2 * 1000;
-                    if (getSocketProperties().getSoTimeout() > stmo) {
-                        stmo = getSocketProperties().getSoTimeout();
-                    }
-                    if (getSocketProperties().getUnlockTimeout() > utmo) {
-                        utmo = getSocketProperties().getUnlockTimeout();
-                    }
-                    s.setSoTimeout(stmo);
-                    s.setSoLinger(getSocketProperties().getSoLingerOn(),getSocketProperties().getSoLingerTime());
-                    if (getLog().isDebugEnabled()) {
-                        getLog().debug("About to unlock socket for:" + unlockAddress);
-                    }
-                    s.connect(unlockAddress,utmo);
-                    if (getDeferAccept()) {
-                        /*
-                         * In the case of a deferred accept / accept filters we need to
-                         * send data to wake up the accept. Send OPTIONS * to bypass
-                         * even BSD accept filters. The Acceptor will discard it.
-                         */
-                        OutputStreamWriter sw;
-
-                        sw = new OutputStreamWriter(s.getOutputStream(), "ISO-8859-1");
-                        sw.write("OPTIONS * HTTP/1.0\r\n" +
-                                 "User-Agent: Tomcat wakeup connection\r\n\r\n");
-                        sw.flush();
-                    }
-                    if (getLog().isDebugEnabled()) {
-                        getLog().debug("Socket unlock completed for:" + unlockAddress);
-                    }
+            try (java.net.Socket s = new java.net.Socket()) {
+                int stmo = 2 * 1000;
+                int utmo = 2 * 1000;
+                if (getSocketProperties().getSoTimeout() > stmo) {
+                    stmo = getSocketProperties().getSoTimeout();
                 }
-            }
-            // Wait for upto 1000ms acceptor threads to unlock
-            // Should only be one thread but retain this code in case the
-            // acceptor start has been customised.
-            long waitLeft = 1000;
-            for (Acceptor acceptor : acceptors) {
-                while (waitLeft > 0 &&
-                        acceptor.getState() == AcceptorState.RUNNING) {
-                    Thread.sleep(5);
-                    waitLeft -= 5;
+                if (getSocketProperties().getUnlockTimeout() > utmo) {
+                    utmo = getSocketProperties().getUnlockTimeout();
+                }
+                s.setSoTimeout(stmo);
+                s.setSoLinger(getSocketProperties().getSoLingerOn(),getSocketProperties().getSoLingerTime());
+                if (getLog().isDebugEnabled()) {
+                    getLog().debug("About to unlock socket for:" + unlockAddress);
+                }
+                s.connect(unlockAddress,utmo);
+                if (getDeferAccept()) {
+                    /*
+                     * In the case of a deferred accept / accept filters we need to
+                     * send data to wake up the accept. Send OPTIONS * to bypass
+                     * even BSD accept filters. The Acceptor will discard it.
+                     */
+                    OutputStreamWriter sw;
+
+                    sw = new OutputStreamWriter(s.getOutputStream(), "ISO-8859-1");
+                    sw.write("OPTIONS * HTTP/1.0\r\n" +
+                            "User-Agent: Tomcat wakeup connection\r\n\r\n");
+                    sw.flush();
+                }
+                if (getLog().isDebugEnabled()) {
+                    getLog().debug("Socket unlock completed for:" + unlockAddress);
                 }
             }
             // Wait for up to 1000ms acceptor threads to unlock. Particularly
@@ -1063,17 +1041,16 @@ public abstract class AbstractEndpoint<S,U> {
             // initially wait for the unlock in a tight loop but if that takes
             // more than 1ms we start using short sleeps to reduce CPU usage.
             long startTime = System.nanoTime();
-            for (Acceptor acceptor : acceptors) {
-                while (startTime + 1_000_000_000 > System.nanoTime() && acceptor.getState() == AcceptorState.RUNNING) {
-                    if (startTime + 1_000_000 < System.nanoTime()) {
-                        Thread.sleep(1);
-                    }
+            while (startTime + 1_000_000_000 > System.nanoTime() && acceptor.getState() == AcceptorState.RUNNING) {
+                if (startTime + 1_000_000 < System.nanoTime()) {
+                    Thread.sleep(1);
                 }
             }
         } catch(Throwable t) {
             ExceptionUtils.handleThrowable(t);
             if (getLog().isDebugEnabled()) {
-                getLog().debug(sm.getString("endpoint.debug.unlock.fail", String.valueOf(getPort())), t);
+                getLog().debug(sm.getString(
+                        "endpoint.debug.unlock.fail", String.valueOf(getPort())), t);
             }
         }
     }
@@ -1193,9 +1170,23 @@ public abstract class AbstractEndpoint<S,U> {
     public abstract void startInternal() throws Exception;
     public abstract void stopInternal() throws Exception;
 
+
+    private void bindWithCleanup() throws Exception {
+        try {
+            bind();
+        } catch (Throwable t) {
+            // Ensure open sockets etc. are cleaned up if something goes
+            // wrong during bind
+            ExceptionUtils.handleThrowable(t);
+            unbind();
+            throw t;
+        }
+    }
+
+
     public void init() throws Exception {
         if (bindOnInit) {
-            bind();
+            bindWithCleanup();
             bindState = BindState.BOUND_ON_INIT;
         }
         if (this.domain != null) {
@@ -1268,19 +1259,18 @@ public abstract class AbstractEndpoint<S,U> {
 
     public final void start() throws Exception {
         if (bindState == BindState.UNBOUND) {
-            bind();
+            bindWithCleanup();
             bindState = BindState.BOUND_ON_START;
         }
         startInternal();
     }
 
-    protected final void startAcceptorThreads() {
-        acceptors = new Acceptor[1];
 
-        acceptors[0] = createAcceptor();
-        String threadName = getName() + "-Acceptor-0";
-        acceptors[0].setThreadName(threadName);
-        Thread t = new Thread(acceptors[0], threadName);
+    protected void startAcceptorThread() {
+        acceptor = new Acceptor<>(this);
+        String threadName = getName() + "-Acceptor";
+        acceptor.setThreadName(threadName);
+        Thread t = new Thread(acceptor, threadName);
         t.setPriority(getAcceptorThreadPriority());
         t.setDaemon(getDaemon());
         t.start();
@@ -1288,18 +1278,13 @@ public abstract class AbstractEndpoint<S,U> {
 
 
     /**
-     * Hook to allow Endpoints to provide a specific Acceptor implementation.
-     * @return the acceptor
-     */
-    protected abstract Acceptor createAcceptor();
-
-
-    /**
-     * Pause the endpoint, which will stop it accepting new connections.
+     * Pause the endpoint, which will stop it accepting new connections and
+     * unlock the acceptor.
      */
     public void pause() {
         if (running && !paused) {
             paused = true;
+            releaseConnectionLatch();
             unlockAccept();
             getHandler().pause();
         }
@@ -1383,37 +1368,6 @@ public abstract class AbstractEndpoint<S,U> {
         }
     }
 
-    /**
-     * Provides a common approach for sub-classes to handle exceptions where a
-     * delay is required to prevent a Thread from entering a tight loop which
-     * will consume CPU and may also trigger large amounts of logging. For
-     * example, this can happen with the Acceptor thread if the ulimit for open
-     * files is reached.
-     *
-     * @param currentErrorDelay The current delay being applied on failure
-     * @return  The delay to apply on the next failure
-     */
-    protected int handleExceptionWithDelay(int currentErrorDelay) {
-        // Don't delay on first exception
-        if (currentErrorDelay > 0) {
-            try {
-                Thread.sleep(currentErrorDelay);
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-        }
-
-        // On subsequent exceptions, start the delay at 50ms, doubling the delay
-        // on every subsequent exception until the delay reaches 1.6 seconds.
-        if (currentErrorDelay == 0) {
-            return INITIAL_ERROR_DELAY;
-        } else if (currentErrorDelay < MAX_ERROR_DELAY) {
-            return currentErrorDelay * 2;
-        } else {
-            return MAX_ERROR_DELAY;
-        }
-    }
-
 
     /**
      * Close the server socket (to prevent further connections) if the server
@@ -1424,6 +1378,16 @@ public abstract class AbstractEndpoint<S,U> {
      */
     public final void closeServerSocketGraceful() {
         if (bindState == BindState.BOUND_ON_START) {
+            // Stop accepting new connections
+            acceptor.stop(-1);
+            // Release locks that may be preventing the acceptor from stopping
+            releaseConnectionLatch();
+            unlockAccept();
+            // Signal to any multiplexed protocols (HTTP/2) that they may wish
+            // to stop accepting new streams
+            getHandler().pause();
+            // Update the bindState. This has the side-effect of disabling
+            // keep-alive for any in-progress connections
             bindState = BindState.SOCKET_CLOSED_ON_STOP;
             try {
                 doCloseServerSocket();
@@ -1435,11 +1399,60 @@ public abstract class AbstractEndpoint<S,U> {
 
 
     /**
+     * Wait for the client connections to the server to close gracefully. The
+     * method will return when all of the client connections have closed or the
+     * method has been waiting for {@code waitTimeMillis}.
+     *
+     * @param waitMillis    The maximum time to wait in milliseconds for the
+     *                      client connections to close.
+     *
+     * @return The wait time, if any remaining when the method returned
+     */
+    public final long awaitConnectionsClose(long waitMillis) {
+        while (waitMillis > 0 && !connections.isEmpty()) {
+            try {
+                Thread.sleep(50);
+                waitMillis -= 50;
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                waitMillis = 0;
+            }
+        }
+        return waitMillis;
+    }
+
+
+    /**
      * Actually close the server socket but don't perform any other clean-up.
      *
      * @throws IOException If an error occurs closing the socket
      */
     protected abstract void doCloseServerSocket() throws IOException;
 
+    protected abstract U serverSocketAccept() throws Exception;
+
+    protected abstract boolean setSocketOptions(U socket);
+
+    /**
+     * Close the socket when the connection has to be immediately closed when
+     * an error occurs while configuring the accepted socket or trying to
+     * dispatch it for processing. The wrapper associated with the socket will
+     * be used for the close.
+     * @param socket The newly accepted socket
+     */
+    protected void closeSocket(U socket) {
+        SocketWrapperBase<S> socketWrapper = connections.get(socket);
+        if (socketWrapper != null) {
+            socketWrapper.close();
+        }
+    }
+
+    /**
+     * Close the socket. This is used when the connector is not in a state
+     * which allows processing the socket, or if there was an error which
+     * prevented the allocation of the socket wrapper.
+     * @param socket The newly accepted socket
+     */
+    protected abstract void destroySocket(U socket);
 }
 
diff --git a/java/org/apache/tomcat/util/net/Acceptor.java b/java/org/apache/tomcat/util/net/Acceptor.java
new file mode 100644
index 0000000..3fa4f96
--- /dev/null
+++ b/java/org/apache/tomcat/util/net/Acceptor.java
@@ -0,0 +1,252 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.jni.Error;
+import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.res.StringManager;
+
+public class Acceptor<U> implements Runnable {
+
+    private static final Log log = LogFactory.getLog(Acceptor.class);
+    private static final StringManager sm = StringManager.getManager(Acceptor.class);
+
+    private static final int INITIAL_ERROR_DELAY = 50;
+    private static final int MAX_ERROR_DELAY = 1600;
+
+    private final AbstractEndpoint<?,U> endpoint;
+    private String threadName;
+    /*
+     * Tracked separately rather than using endpoint.isRunning() as calls to
+     * endpoint.stop() and endpoint.start() in quick succession can cause the
+     * acceptor to continue running when it should terminate.
+     */
+    private volatile boolean stopCalled = false;
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+    protected volatile AcceptorState state = AcceptorState.NEW;
+
+
+    public Acceptor(AbstractEndpoint<?,U> endpoint) {
+        this.endpoint = endpoint;
+    }
+
+
+    public final AcceptorState getState() {
+        return state;
+    }
+
+
+    final void setThreadName(final String threadName) {
+        this.threadName = threadName;
+    }
+
+
+    final String getThreadName() {
+        return threadName;
+    }
+
+
+    @Override
+    public void run() {
+
+        int errorDelay = 0;
+        long pauseStart = 0;
+
+        try {
+            // Loop until we receive a shutdown command
+            while (!stopCalled) {
+
+                // Loop if endpoint is paused.
+                // There are two likely scenarios here.
+                // The first scenario is that Tomcat is shutting down. In this
+                // case - and particularly for the unit tests - we want to exit
+                // this loop as quickly as possible. The second scenario is a
+                // genuine pause of the connector. In this case we want to avoid
+                // excessive CPU usage.
+                // Therefore, we start with a tight loop but if there isn't a
+                // rapid transition to stop then sleeps are introduced.
+                // < 1ms       - tight loop
+                // 1ms to 10ms - 1ms sleep
+                // > 10ms      - 10ms sleep
+                while (endpoint.isPaused() && !stopCalled) {
+                    if (state != AcceptorState.PAUSED) {
+                        pauseStart = System.nanoTime();
+                        // Entered pause state
+                        state = AcceptorState.PAUSED;
+                    }
+                    if ((System.nanoTime() - pauseStart) > 1_000_000) {
+                        // Paused for more than 1ms
+                        try {
+                            if ((System.nanoTime() - pauseStart) > 10_000_000) {
+                                Thread.sleep(10);
+                            } else {
+                                Thread.sleep(1);
+                            }
+                        } catch (InterruptedException e) {
+                            // Ignore
+                        }
+                    }
+                }
+
+                if (stopCalled) {
+                    break;
+                }
+                state = AcceptorState.RUNNING;
+
+                try {
+                    //if we have reached max connections, wait
+                    endpoint.countUpOrAwaitConnection();
+
+                    // Endpoint might have been paused while waiting for latch
+                    // If that is the case, don't accept new connections
+                    if (endpoint.isPaused()) {
+                        continue;
+                    }
+
+                    U socket = null;
+                    try {
+                        // Accept the next incoming connection from the server
+                        // socket
+                        socket = endpoint.serverSocketAccept();
+                    } catch (Exception ioe) {
+                        // We didn't get a socket
+                        endpoint.countDownConnection();
+                        if (endpoint.isRunning()) {
+                            // Introduce delay if necessary
+                            errorDelay = handleExceptionWithDelay(errorDelay);
+                            // re-throw
+                            throw ioe;
+                        } else {
+                            break;
+                        }
+                    }
+                    // Successful accept, reset the error delay
+                    errorDelay = 0;
+
+                    // Configure the socket
+                    if (!stopCalled && !endpoint.isPaused()) {
+                        // setSocketOptions() will hand the socket off to
+                        // an appropriate processor if successful
+                        if (!endpoint.setSocketOptions(socket)) {
+                            endpoint.closeSocket(socket);
+                        }
+                    } else {
+                        endpoint.destroySocket(socket);
+                    }
+                } catch (Throwable t) {
+                    ExceptionUtils.handleThrowable(t);
+                    String msg = sm.getString("endpoint.accept.fail");
+                    // APR specific.
+                    // Could push this down but not sure it is worth the trouble.
+                    if (t instanceof Error) {
+                        Error e = (Error) t;
+                        if (e.getError() == 233) {
+                            // Not an error on HP-UX so log as a warning
+                            // so it can be filtered out on that platform
+                            // See bug 50273
+                            log.warn(msg, t);
+                        } else {
+                            log.error(msg, t);
+                        }
+                    } else {
+                            log.error(msg, t);
+                    }
+                }
+            }
+        } finally {
+            stopLatch.countDown();
+        }
+        state = AcceptorState.ENDED;
+    }
+
+
+    /**
+     * Signals the Acceptor to stop, waiting at most 10 seconds for the stop to
+     * complete before returning. If the stop does not complete in that time a
+     * warning will be logged.
+     *
+     * @deprecated This method will be removed in Tomcat 10.1.x onwards.
+     *             Use {@link #stop(int)} instead.
+     */
+    @Deprecated
+    public void stop() {
+        stop(10);
+    }
+
+
+    /**
+     * Signals the Acceptor to stop, optionally waiting for that stop process
+     * to complete before returning. If a wait is requested and the stop does
+     * not complete in that time a warning will be logged.
+     *
+     * @param waitSeconds The time to wait in seconds. Use a value less than
+     *                    zero for no wait.
+     */
+    public void stop(int waitSeconds) {
+        stopCalled = true;
+        if (waitSeconds > 0) {
+            try {
+                if (!stopLatch.await(waitSeconds, TimeUnit.SECONDS)) {
+                   log.warn(sm.getString("acceptor.stop.fail", getThreadName()));
+                }
+            } catch (InterruptedException e) {
+                log.warn(sm.getString("acceptor.stop.interrupted", getThreadName()), e);
+            }
+        }
+    }
+
+
+    /**
+     * Handles exceptions where a delay is required to prevent a Thread from
+     * entering a tight loop which will consume CPU and may also trigger large
+     * amounts of logging. For example, this can happen if the ulimit for open
+     * files is reached.
+     *
+     * @param currentErrorDelay The current delay being applied on failure
+     * @return  The delay to apply on the next failure
+     */
+    protected int handleExceptionWithDelay(int currentErrorDelay) {
+        // Don't delay on first exception
+        if (currentErrorDelay > 0) {
+            try {
+                Thread.sleep(currentErrorDelay);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+        }
+
+        // On subsequent exceptions, start the delay at 50ms, doubling the delay
+        // on every subsequent exception until the delay reaches 1.6 seconds.
+        if (currentErrorDelay == 0) {
+            return INITIAL_ERROR_DELAY;
+        } else if (currentErrorDelay < MAX_ERROR_DELAY) {
+            return currentErrorDelay * 2;
+        } else {
+            return MAX_ERROR_DELAY;
+        }
+    }
+
+
+    public enum AcceptorState {
+        NEW, RUNNING, PAUSED, ENDED
+    }
+}
diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java
index 3fa5a2d..e61c2d5 100644
--- a/java/org/apache/tomcat/util/net/AprEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AprEndpoint.java
@@ -27,7 +27,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -58,8 +57,8 @@ import org.apache.tomcat.util.ExceptionUtils;
 import org.apache.tomcat.util.buf.ByteBufferUtils;
 import org.apache.tomcat.util.collections.SynchronizedStack;
 import org.apache.tomcat.util.compat.JrePlatform;
-import org.apache.tomcat.util.net.AbstractEndpoint.Acceptor.AcceptorState;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.Acceptor.AcceptorState;
 import org.apache.tomcat.util.net.openssl.OpenSSLContext;
 import org.apache.tomcat.util.net.openssl.OpenSSLUtil;
 
@@ -113,10 +112,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
     private long previousAcceptedSocketNanoTime = 0;
 
 
-
-    private final Map<Long,AprSocketWrapper> connections = new ConcurrentHashMap<>();
-
-
     // ------------------------------------------------------------ Constructor
 
     public AprEndpoint() {
@@ -125,9 +120,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         // - mandatory use of direct buffers forces output buffering
         // - needs extra output flushes due to buffering
         setUseAsyncIO(false);
-        // Need to override the default for maxConnections to align it with what
-        // was pollerSize (before the two were merged)
-        setMaxConnections(8 * 1024);
     }
 
 
@@ -479,7 +471,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 sendfile.start();
             }
 
-            startAcceptorThreads();
+            startAcceptorThread();
         }
     }
 
@@ -489,12 +481,13 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
      */
     @Override
     public void stopInternal() {
-        releaseConnectionLatch();
         if (!paused) {
             pause();
         }
         if (running) {
             running = false;
+            // Stop new connections being accepted.
+            acceptor.stop(10);
 
             // Stop the Poller calling select
             poller.stop();
@@ -503,30 +496,14 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 sendfile.stop();
             }
 
-            // Wait for the acceptor to shutdown.
-            // Should only be one thread but retain this code in case the
-            // acceptor start has been customised.
-            for (AbstractEndpoint.Acceptor acceptor : acceptors) {
-                long waitLeft = 10000;
-                while (waitLeft > 0 &&
-                        acceptor.getState() != AcceptorState.ENDED &&
-                        serverSock != 0) {
-                    try {
-                        Thread.sleep(50);
-                    } catch (InterruptedException e) {
-                        // Ignore
-                    }
-                    waitLeft -= 50;
-                }
-                if (waitLeft == 0) {
-                    log.warn(sm.getString("endpoint.warn.unlockAcceptorFailed",
-                            acceptor.getThreadName()));
-                   // If the Acceptor is still running force
-                   // the hard socket close.
-                   if (serverSock != 0) {
-                       Socket.shutdown(serverSock, Socket.APR_SHUTDOWN_READ);
-                       serverSock = 0;
-                   }
+            // Wait for the acceptor to shutdown
+            if (acceptor.getState() != AcceptorState.ENDED && !getBindOnInit()) {
+                log.warn(sm.getString("endpoint.warn.unlockAcceptorFailed", acceptor.getThreadName()));
+                // If the Acceptor is still running force
+                // the hard socket close.
+                if (serverSock != 0) {
+                    Socket.shutdown(serverSock, Socket.APR_SHUTDOWN_READ);
+                    serverSock = 0;
                 }
             }
 
@@ -651,12 +628,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
     // ------------------------------------------------------ Protected Methods
 
-    @Override
-    protected AbstractEndpoint.Acceptor createAcceptor() {
-        return new Acceptor();
-    }
-
-
     /**
      * Process the specified connection.
      * @param socketWrapper The socket wrapper
@@ -763,15 +734,16 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
      *  and processing may continue, <code>false</code> if the socket needs to be
      *  close immediately
      */
-    protected boolean processSocketWithOptions(long socket) {
+    @Override
+    protected boolean setSocketOptions(Long socket) {
         try {
             if (log.isDebugEnabled()) {
                 log.debug(sm.getString("endpoint.debug.socket", socket));
             }
 
-            // Do the duplicate accept check here rather than in Acceptor.run()
+            // Do the duplicate accept check here rather than in serverSocketaccept()
             // so we can cache the results in the SocketWrapper
-            AprSocketWrapper wrapper = new AprSocketWrapper(Long.valueOf(socket), this);
+            AprSocketWrapper wrapper = new AprSocketWrapper(socket, this);
             // Bug does not affect Windows. Skip the check on that platform.
             if (!JrePlatform.IS_WINDOWS) {
                 long currentNanoTime = System.nanoTime();
@@ -787,7 +759,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 previousAcceptedSocketNanoTime = currentNanoTime;
             }
 
-            connections.put(Long.valueOf(socket), wrapper);
+            connections.put(socket, wrapper);
             wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
             wrapper.setReadTimeout(getConnectionTimeout());
             wrapper.setWriteTimeout(getConnectionTimeout());
@@ -805,6 +777,24 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
     }
 
 
+    @Override
+    protected Long serverSocketAccept() throws Exception {
+        // See setSocketOptions(Long) for duplicate accept check
+        long socket = Socket.accept(serverSock);
+        if (socket == 0) {
+            throw new IOException(sm.getString("endpoint.err.accept", getName()));
+        }
+        if (log.isDebugEnabled()) {
+            long sa = Address.get(Socket.APR_REMOTE, socket);
+            Sockaddr addr = Address.getInfo(sa);
+            log.debug(sm.getString("endpoint.apr.remoteport",
+                    Long.valueOf(socket),
+                    Long.valueOf(addr.port)));
+        }
+        return Long.valueOf(socket);
+    }
+
+
     /**
      * Process the given socket. Typically keep alive or upgraded protocol.
      *
@@ -822,7 +812,13 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
             // close in case won't cause an issue.
             return false;
         }
-        return processSocket(socketWrapper, event, true);
+        if (event == SocketEvent.OPEN_READ && socketWrapper.readOperation != null) {
+            return socketWrapper.readOperation.process();
+        } else if (event == SocketEvent.OPEN_WRITE && socketWrapper.writeOperation != null) {
+            return socketWrapper.writeOperation.process();
+        } else {
+            return processSocket(socketWrapper, event, true);
+        }
     }
 
 
@@ -833,26 +829,19 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
     }
 
 
-    private void closeSocket(long socket) {
-        // Once this is called, the mapping from socket to wrapper will no
-        // longer be required.
-        SocketWrapperBase<Long> wrapper = connections.remove(Long.valueOf(socket));
-        if (wrapper != null) {
-            // Cast to avoid having to catch an IOE that is never thrown.
-            ((AprSocketWrapper) wrapper).close();
-        }
+    private void closeSocketInternal(long socket) {
+        closeSocket(Long.valueOf(socket));
     }
 
-    /*
-     * This method should only be called if there is no chance that the socket
-     * is currently being used by the Poller. It is generally a bad idea to call
-     * this directly from a known error condition.
-     */
-    private void destroySocket(long socket) {
-        connections.remove(Long.valueOf(socket));
+    @Override
+    protected void destroySocket(Long socket) {
+        countDownConnection();
+        destroySocketInternal(socket.longValue());
+    }
+
+    private void destroySocketInternal(long socket) {
         if (log.isDebugEnabled()) {
-            String msg = sm.getString("endpoint.debug.destroySocket",
-                    Long.valueOf(socket));
+            String msg = sm.getString("endpoint.debug.destroySocket", Long.valueOf(socket));
             if (log.isTraceEnabled()) {
                 log.trace(msg, new Exception());
             } else {
@@ -865,7 +854,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         // are closed when calling stop() followed by start().
         if (socket != 0) {
             Socket.destroy(socket);
-            countDownConnection();
         }
     }
 
@@ -874,133 +862,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         return log;
     }
 
-    // --------------------------------------------------- Acceptor Inner Class
-    /**
-     * The background thread that listens for incoming TCP/IP connections and
-     * hands them off to an appropriate processor.
-     */
-    protected class Acceptor extends AbstractEndpoint.Acceptor {
-
-        private final Log log = LogFactory.getLog(AprEndpoint.Acceptor.class); // must not be static
-
-        @Override
-        public void run() {
-
-            int errorDelay = 0;
-            long pauseStart = 0;
-
-            // Loop until we receive a shutdown command
-            while (running) {
-
-                // Loop if endpoint is paused.
-                // There are two likely scenarios here.
-                // The first scenario is that Tomcat is shutting down. In this
-                // case - and particularly for the unit tests - we want to exit
-                // this loop as quickly as possible. The second scenario is a
-                // genuine pause of the connector. In this case we want to avoid
-                // excessive CPU usage.
-                // Therefore, we start with a tight loop but if there isn't a
-                // rapid transition to stop then sleeps are introduced.
-                // < 1ms       - tight loop
-                // 1ms to 10ms - 1ms sleep
-                // > 10ms      - 10ms sleep
-                while (paused && running) {
-                    if (state != AcceptorState.PAUSED) {
-                        pauseStart = System.nanoTime();
-                        // Entered pause state
-                        state = AcceptorState.PAUSED;
-                    }
-                    if ((System.nanoTime() - pauseStart) > 1_000_000) {
-                        // Paused for more than 1ms
-                        try {
-                            if ((System.nanoTime() - pauseStart) > 10_000_000) {
-                                Thread.sleep(10);
-                            } else {
-                                Thread.sleep(1);
-                            }
-                        } catch (InterruptedException e) {
-                            // Ignore
-                        }
-                    }
-                }
-
-                if (!running) {
-                    break;
-                }
-                state = AcceptorState.RUNNING;
-
-                try {
-                    //if we have reached max connections, wait
-                    countUpOrAwaitConnection();
-
-                    long socket = 0;
-                    try {
-                        // Accept the next incoming connection from the server
-                        // socket
-                        // See processSocketWithOptions(long) for duplicate
-                        // accept check
-                        socket = Socket.accept(serverSock);
-                        if (socket == 0) {
-                            throw new IOException(sm.getString("endpoint.err.accept", getName()));
-                        }
-                        if (log.isDebugEnabled()) {
-                            long sa = Address.get(Socket.APR_REMOTE, socket);
-                            Sockaddr addr = Address.getInfo(sa);
-                            log.debug(sm.getString("endpoint.apr.remoteport",
-                                    Long.valueOf(socket),
-                                    Long.valueOf(addr.port)));
-                        }
-                    } catch (Exception e) {
-                        // We didn't get a socket
-                        countDownConnection();
-                        if (running) {
-                            // Introduce delay if necessary
-                            errorDelay = handleExceptionWithDelay(errorDelay);
-                            // re-throw
-                            throw e;
-                        } else {
-                            break;
-                        }
-                    }
-                    // Successful accept, reset the error delay
-                    errorDelay = 0;
-
-                    if (running && !paused) {
-                        // Hand this socket off to an appropriate processor
-                        if (!processSocketWithOptions(socket)) {
-                            // Close socket right away
-                            closeSocket(socket);
-                        }
-                    } else {
-                        // Close socket right away
-                        // No code path could have added the socket to the
-                        // Poller so use destroySocket()
-                        destroySocket(socket);
-                    }
-                } catch (Throwable t) {
-                    ExceptionUtils.handleThrowable(t);
-                    String msg = sm.getString("endpoint.accept.fail");
-                    if (t instanceof Error) {
-                        Error e = (Error) t;
-                        if (e.getError() == 233) {
-                            // Not an error on HP-UX so log as a warning
-                            // so it can be filtered out on that platform
-                            // See bug 50273
-                            log.warn(msg, t);
-                        } else {
-                            log.error(msg, t);
-                        }
-                    } else {
-                            log.error(msg, t);
-                    }
-                }
-                // The processor will recycle itself when it finishes
-            }
-            state = AcceptorState.ENDED;
-        }
-    }
-
-
     // -------------------------------------------------- SocketInfo Inner Class
 
     public static class SocketInfo {
@@ -1316,7 +1177,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 removeFromPoller(info.socket);
                 // Poller isn't running at this point so use destroySocket()
                 // directly
-                destroySocket(info.socket);
+                closeSocketInternal(info.socket);
+                destroySocketInternal(info.socket);
                 info = closeList.get();
             }
             closeList.clear();
@@ -1327,7 +1189,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 removeFromPoller(info.socket);
                 // Poller isn't running at this point so use destroySocket()
                 // directly
-                destroySocket(info.socket);
+                closeSocketInternal(info.socket);
+                destroySocketInternal(info.socket);
                 info = addList.get();
             }
             addList.clear();
@@ -1335,7 +1198,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
             int rv = Poll.pollset(aprPoller, desc);
             if (rv > 0) {
                 for (int n = 0; n < rv; n++) {
-                    destroySocket(desc[n*2+1]);
+                    closeSocketInternal(desc[n*2+1]);
+                    destroySocketInternal(desc[n*2+1]);
                 }
             }
             Pool.destroy(pool);
@@ -1451,7 +1315,15 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 SocketWrapperBase<Long> socketWrapper = connections.get(Long.valueOf(socket));
                 if (socketWrapper != null) {
                     socketWrapper.setError(new SocketTimeoutException());
-                    processSocket(socketWrapper, SocketEvent.ERROR, true);
+                    if (socketWrapper.readOperation != null || socketWrapper.writeOperation != null) {
+                        if (socketWrapper.readOperation != null) {
+                            socketWrapper.readOperation.process();
+                        } else {
+                            socketWrapper.writeOperation.process();
+                        }
+                    } else {
+                        processSocket(socketWrapper, SocketEvent.ERROR, true);
+                    }
                 }
                 socket = timeouts.check(date);
             }
@@ -1550,7 +1422,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                         while (info != null) {
                             localAddList.remove(info.socket);
                             removeFromPoller(info.socket);
-                            destroySocket(info.socket);
+                            closeSocketInternal(info.socket);
+                            destroySocketInternal(info.socket);
                             info = localCloseList.get();
                         }
                     }
@@ -1565,8 +1438,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                         Long.valueOf(info.socket)));
                             }
                             timeouts.remove(info.socket);
-                            AprSocketWrapper wrapper = connections.get(
-                                    Long.valueOf(info.socket));
+                            AprSocketWrapper wrapper =
+                                    (AprSocketWrapper) connections.get(Long.valueOf(info.socket));
                             if (wrapper != null) {
                                 if (info.read() || info.write()) {
                                     wrapper.pollerFlags = wrapper.pollerFlags |
@@ -1579,7 +1452,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                     // the poller.
                                     removeFromPoller(info.socket);
                                     if (!addToPoller(info.socket, wrapper.pollerFlags)) {
-                                        closeSocket(info.socket);
+                                        wrapper.close();
                                     } else {
                                         timeouts.add(info.socket,
                                                 System.currentTimeMillis() +
@@ -1587,7 +1460,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                     }
                                 } else {
                                     // Should never happen.
-                                    closeSocket(info.socket);
+                                    wrapper.close();
                                     getLog().warn(sm.getString(
                                             "endpoint.apr.pollAddInvalid", info));
                                 }
@@ -1611,8 +1484,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                         Long.valueOf(desc[n*2])));
                             }
                             long timeout = timeouts.remove(desc[n*2+1]);
-                            AprSocketWrapper wrapper = connections.get(
-                                    Long.valueOf(desc[n*2+1]));
+                            AprSocketWrapper wrapper = (AprSocketWrapper)
+                                    connections.get(Long.valueOf(desc[n*2+1]));
                             if (wrapper == null) {
                                 // Socket was closed in another thread while still in
                                 // the Poller but wasn't removed from the Poller before
@@ -1637,31 +1510,31 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                     // Error probably occurred during a non-blocking read
                                     if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
                                         // Close socket and clear pool
-                                        closeSocket(desc[n*2+1]);
+                                        wrapper.close();
                                     }
                                 } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
                                     // Error probably occurred during a non-blocking write
                                     if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
                                         // Close socket and clear pool
-                                        closeSocket(desc[n*2+1]);
+                                        wrapper.close();
                                     }
                                 } else if ((wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
                                     // Can't tell what was happening when the error occurred but the
                                     // socket is registered for non-blocking read so use that
                                     if (!processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
                                         // Close socket and clear pool
-                                        closeSocket(desc[n*2+1]);
+                                        wrapper.close();
                                     }
                                 } else if ((wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
                                     // Can't tell what was happening when the error occurred but the
                                     // socket is registered for non-blocking write so use that
                                     if (!processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
                                         // Close socket and clear pool
-                                        closeSocket(desc[n*2+1]);
+                                        wrapper.close();
                                     }
                                 } else {
                                     // Close socket and clear pool
-                                    closeSocket(desc[n*2+1]);
+                                    wrapper.close();
                                 }
                             } else if (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN)
                                     || ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT)) {
@@ -1670,14 +1543,14 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                         !processSocket(desc[n*2+1], SocketEvent.OPEN_READ)) {
                                     error = true;
                                     // Close socket and clear pool
-                                    closeSocket(desc[n*2+1]);
+                                    wrapper.close();
                                 }
                                 if (!error &&
                                         ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) &&
                                         !processSocket(desc[n*2+1], SocketEvent.OPEN_WRITE)) {
                                     // Close socket and clear pool
                                     error = true;
-                                    closeSocket(desc[n*2+1]);
+                                    wrapper.close();
                                 }
                                 if (!error && wrapper.pollerFlags != 0) {
                                     // If socket was registered for multiple events but
@@ -1709,7 +1582,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                         "endpoint.apr.pollUnknownEvent",
                                         Long.valueOf(desc[n*2])));
                                 // Close socket and clear pool
-                                closeSocket(desc[n*2+1]);
+                                wrapper.close();
                             }
                         }
                     } else if (rv < 0) {
@@ -1864,13 +1737,13 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
             // Close any socket remaining in the add queue
             for (int i = (addS.size() - 1); i >= 0; i--) {
                 SendfileData data = addS.get(i);
-                closeSocket(data.socket);
+                closeSocketInternal(data.socket);
             }
             // Close all sockets still in the poller
             int rv = Poll.pollset(sendfilePollset, desc);
             if (rv > 0) {
                 for (int n = 0; n < rv; n++) {
-                    closeSocket(desc[n*2+1]);
+                    closeSocketInternal(desc[n*2+1]);
                 }
             }
             Pool.destroy(pool);
@@ -2005,7 +1878,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                             Integer.valueOf(rv),
                                             Error.strerror(rv)));
                                     // Can't do anything: close the socket right away
-                                    closeSocket(data.socket);
+                                    closeSocketInternal(data.socket);
                                 }
                             }
                             addS.clear();
@@ -2027,7 +1900,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                 remove(state);
                                 // Destroy file descriptor pool, which should close the file
                                 // Close the socket, as the response would be incomplete
-                                closeSocket(state.socket);
+                                closeSocketInternal(state.socket);
                                 continue;
                             }
                             // Write some data using sendfile
@@ -2039,7 +1912,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                 remove(state);
                                 // Close the socket, as the response would be incomplete
                                 // This will close the file too.
-                                closeSocket(state.socket);
+                                closeSocketInternal(state.socket);
                                 continue;
                             }
 
@@ -2051,7 +1924,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                 case NONE: {
                                     // Close the socket since this is
                                     // the end of the not keep-alive request.
-                                    closeSocket(state.socket);
+                                    closeSocketInternal(state.socket);
                                     break;
                                 }
                                 case PIPELINED: {
@@ -2060,7 +1933,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                     Socket.timeoutSet(state.socket, getConnectionTimeout() * 1000);
                                     // Process the pipelined request data
                                     if (!processSocket(state.socket, SocketEvent.OPEN_READ)) {
-                                        closeSocket(state.socket);
+                                        closeSocketInternal(state.socket);
                                     }
                                     break;
                                 }
@@ -2109,7 +1982,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                                 remove(state);
                                 // Destroy file descriptor pool, which should close the file
                                 // Close the socket, as the response would be incomplete
-                                closeSocket(state.socket);
+                                closeSocketInternal(state.socket);
                             }
                         }
                     }
@@ -2157,7 +2030,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                     } else {
                         // Close socket and pool
                         getHandler().process(socket, SocketEvent.CONNECT_FAIL);
-                        closeSocket(socket.getSocket().longValue());
+                        socket.close();
                         socket = null;
                     }
                 } else {
@@ -2165,7 +2038,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                     if (!setSocketOptions(socket)) {
                         // Close socket and pool
                         getHandler().process(socket, SocketEvent.CONNECT_FAIL);
-                        closeSocket(socket.getSocket().longValue());
+                        socket.close();
                         socket = null;
                         return;
                     }
@@ -2173,7 +2046,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                     Handler.SocketState state = getHandler().process(socket, SocketEvent.OPEN_READ);
                     if (state == Handler.SocketState.CLOSED) {
                         // Close socket and pool
-                        closeSocket(socket.getSocket().longValue());
+                        socket.close();
                         socket = null;
                     }
                 }
@@ -2202,7 +2075,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 SocketState state = getHandler().process(socketWrapper, event);
                 if (state == Handler.SocketState.CLOSED) {
                     // Close socket and pool
-                    closeSocket(socketWrapper.getSocket().longValue());
+                    socketWrapper.close();
                 }
             } finally {
                 socketWrapper = null;
@@ -2335,10 +2208,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
 
         private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
-            if (isClosed()) {
-                throw new IOException(sm.getString("socket.apr.closed", getSocket()));
-            }
-
             Lock readLock = getBlockingStatusReadLock();
             WriteLock writeLock = getBlockingStatusWriteLock();
 
@@ -2346,6 +2215,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
             int result = 0;
             readLock.lock();
             try {
+                checkClosed();
                 if (getBlockingStatus() == block) {
                     if (block) {
                         Socket.timeoutSet(getSocket().longValue(), getReadTimeout() * 1000);
@@ -2361,6 +2231,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
             if (!readDone) {
                 writeLock.lock();
                 try {
+                    checkClosed();
                     // Set the current settings for this socket
                     setBlockingStatus(block);
                     if (block) {
@@ -2431,44 +2302,40 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         }
 
 
+        private void checkClosed() throws IOException {
+            if (isClosed()) {
+                throw new IOException(sm.getString("socket.apr.closed", getSocket()));
+            }
+        }
+
+
         @Override
         protected void doClose() {
             if (log.isDebugEnabled()) {
                 log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])");
             }
-            try {
-                getEndpoint().getHandler().release(this);
-            } catch (Throwable e) {
-                ExceptionUtils.handleThrowable(e);
-                if (log.isDebugEnabled()) {
-                    log.error(sm.getString("endpoint.debug.handlerRelease"), e);
-                }
-            }
+            getEndpoint().connections.remove(getSocket());
+            socketBufferHandler.free();
             socketBufferHandler = SocketBufferHandler.EMPTY;
             nonBlockingWriteBuffer.clear();
-            synchronized (closed) {
-                if (sslOutputBuffer != null) {
-                    ByteBufferUtils.cleanDirectBuffer(sslOutputBuffer);
-                }
-                Poller poller = ((AprEndpoint) getEndpoint()).getPoller();
-                if (poller != null) {
-                    poller.close(getSocket().longValue());
-                }
+            if (sslOutputBuffer != null) {
+                ByteBufferUtils.cleanDirectBuffer(sslOutputBuffer);
+            }
+            Poller poller = ((AprEndpoint) getEndpoint()).getPoller();
+            if (poller != null) {
+                poller.close(getSocket().longValue());
             }
         }
 
 
         @Override
         protected void doWrite(boolean block, ByteBuffer from) throws IOException {
-            if (isClosed()) {
-                throw new IOException(sm.getString("socket.apr.closed", getSocket()));
-            }
-
             Lock readLock = getBlockingStatusReadLock();
             WriteLock writeLock = getBlockingStatusWriteLock();
 
             readLock.lock();
             try {
+                checkClosed();
                 if (getBlockingStatus() == block) {
                     if (block) {
                         Socket.timeoutSet(getSocket().longValue(), getWriteTimeout() * 1000);
@@ -2482,6 +2349,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
             writeLock.lock();
             try {
+                checkClosed();
                 // Set the current settings for this socket
                 setBlockingStatus(block);
                 if (block) {
diff --git a/java/org/apache/tomcat/util/net/Nio2Channel.java b/java/org/apache/tomcat/util/net/Nio2Channel.java
index a22b7ab..a2612fd 100644
--- a/java/org/apache/tomcat/util/net/Nio2Channel.java
+++ b/java/org/apache/tomcat/util/net/Nio2Channel.java
@@ -38,7 +38,7 @@ public class Nio2Channel implements AsynchronousByteChannel {
 
     protected final SocketBufferHandler bufHandler;
     protected AsynchronousSocketChannel sc = null;
-    protected SocketWrapperBase<Nio2Channel> socket = null;
+    protected SocketWrapperBase<Nio2Channel> socketWrapper = null;
 
     public Nio2Channel(SocketBufferHandler bufHandler) {
         this.bufHandler = bufHandler;
@@ -48,14 +48,14 @@ public class Nio2Channel implements AsynchronousByteChannel {
      * Reset the channel.
      *
      * @param channel The new async channel to associate with this NIO2 channel
-     * @param socket  The new socket to associate with this NIO2 channel
+     * @param socketWrapper The new socket to associate with this NIO2 channel
      *
      * @throws IOException If a problem was encountered resetting the channel
      */
-    public void reset(AsynchronousSocketChannel channel, SocketWrapperBase<Nio2Channel> socket)
+    public void reset(AsynchronousSocketChannel channel, SocketWrapperBase<Nio2Channel> socketWrapper)
             throws IOException {
         this.sc = channel;
-        this.socket = socket;
+        this.socketWrapper = socketWrapper;
         bufHandler.reset();
     }
 
@@ -66,8 +66,8 @@ public class Nio2Channel implements AsynchronousByteChannel {
         bufHandler.free();
     }
 
-    public SocketWrapperBase<Nio2Channel> getSocket() {
-        return socket;
+    SocketWrapperBase<Nio2Channel> getSocketWrapper() {
+        return socketWrapper;
     }
 
 
diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index 217cdb8..60d24d2 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -46,6 +46,7 @@ import org.apache.tomcat.util.ExceptionUtils;
 import org.apache.tomcat.util.collections.SynchronizedStack;
 import org.apache.tomcat.util.compat.JrePlatform;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.Acceptor.AcceptorState;
 import org.apache.tomcat.util.net.jsse.JSSESupport;
 
 /**
@@ -88,19 +89,8 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
     private long previousAcceptedSocketNanoTime = 0;
 
 
-    public Nio2Endpoint() {
-        // Override the defaults for NIO2
-        // Disable maxConnections by default for NIO2 (see BZ58103)
-        setMaxConnections(-1);
-    }
-
-
     // ------------------------------------------------------------- Properties
 
-    public void setSocketProperties(SocketProperties socketProperties) {
-        this.socketProperties = socketProperties;
-    }
-
     /**
      * Is deferAccept supported?
      */
@@ -181,23 +171,42 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
             }
 
             initializeConnectionLatch();
-            startAcceptorThreads();
+            startAcceptorThread();
         }
     }
 
+    @Override
+    protected void startAcceptorThread() {
+        // Instead of starting a real acceptor thread, this will instead call
+        // an asynchronous accept operation
+        if (acceptor == null) {
+            acceptor = new Nio2Acceptor(this);
+            acceptor.setThreadName(getName() + "-Acceptor");
+        }
+        acceptor.state = AcceptorState.RUNNING;
+        getExecutor().execute(acceptor);
+    }
+
+    @Override
+    public void resume() {
+        super.resume();
+        if (isRunning()) {
+            acceptor.state = AcceptorState.RUNNING;
+            getExecutor().execute(acceptor);
+        }
+    }
 
     /**
      * Stop the endpoint. This will cause all processing threads to stop.
      */
     @Override
     public void stopInternal() {
-        releaseConnectionLatch();
         if (!paused) {
             pause();
         }
         if (running) {
             running = false;
-            unlockAccept();
+            acceptor.stop(10);
             // Use the executor to avoid binding the main thread if something bad
             // occurs and unbind will also wait for a bit for it to complete
             getExecutor().execute(new Runnable() {
@@ -205,8 +214,8 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                 public void run() {
                     // Then close all active connections if any remain
                     try {
-                        for (Nio2Channel channel : getHandler().getOpenSockets()) {
-                            channel.getSocket().close();
+                        for (SocketWrapperBase<Nio2Channel> wrapper : getConnections()) {
+                            wrapper.close();
                         }
                     } catch (Throwable t) {
                         ExceptionUtils.handleThrowable(t);
@@ -289,20 +298,6 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
 
     // ------------------------------------------------------ Protected Methods
 
-
-    public int getWriteBufSize() {
-        return socketProperties.getTxBufSize();
-    }
-
-    public int getReadBufSize() {
-        return socketProperties.getRxBufSize();
-    }
-
-    @Override
-    protected AbstractEndpoint.Acceptor createAcceptor() {
-        return new Acceptor();
-    }
-
     /**
      * Process the specified connection.
      * @param socket The socket channel
@@ -310,9 +305,11 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
      *  and processing may continue, <code>false</code> if the socket needs to be
      *  close immediately
      */
+    @Override
     protected boolean setSocketOptions(AsynchronousSocketChannel socket) {
+        Nio2SocketWrapper socketWrapper = null;
         try {
-            socketProperties.setProperties(socket);
+            // Allocate channel and wrapper
             Nio2Channel channel = null;
             if (nioChannels != null) {
                 channel = nioChannels.pop();
@@ -328,161 +325,188 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                     channel = new Nio2Channel(bufhandler);
                 }
             }
-            Nio2SocketWrapper socketWrapper = new Nio2SocketWrapper(channel, this);
-            channel.reset(socket, socketWrapper);
-            socketWrapper.setReadTimeout(getSocketProperties().getSoTimeout());
-            socketWrapper.setWriteTimeout(getSocketProperties().getSoTimeout());
-            socketWrapper.setKeepAliveLeft(Nio2Endpoint.this.getMaxKeepAliveRequests());
+            Nio2SocketWrapper newWrapper = new Nio2SocketWrapper(channel, this);
+            channel.reset(socket, newWrapper);
+            connections.put(socket, newWrapper);
+            socketWrapper = newWrapper;
+
+            // Set socket properties
+            socketProperties.setProperties(socket);
+
             socketWrapper.setReadTimeout(getConnectionTimeout());
             socketWrapper.setWriteTimeout(getConnectionTimeout());
-            // Continue processing on another thread
-            return processSocket(socketWrapper, SocketEvent.OPEN_READ, true);
+            socketWrapper.setKeepAliveLeft(Nio2Endpoint.this.getMaxKeepAliveRequests());
+            // Continue processing on the same thread as the acceptor is async
+            return processSocket(socketWrapper, SocketEvent.OPEN_READ, false);
         } catch (Throwable t) {
             ExceptionUtils.handleThrowable(t);
-            log.error("",t);
+            log.error(sm.getString("endpoint.socketOptionsError"), t);
+            if (socketWrapper == null) {
+                destroySocket(socket);
+            }
         }
-        // Tell to close the socket
+        // Tell to close the socket if needed
         return false;
     }
 
 
     @Override
-    protected SocketProcessorBase<Nio2Channel> createSocketProcessor(
-            SocketWrapperBase<Nio2Channel> socketWrapper, SocketEvent event) {
-        return new SocketProcessor(socketWrapper, event);
+    protected void destroySocket(AsynchronousSocketChannel socket) {
+        countDownConnection();
+        try {
+            socket.close();
+        } catch (IOException ioe) {
+            if (log.isDebugEnabled()) {
+                log.debug(sm.getString("endpoint.err.close"), ioe);
+            }
+        }
+    }
+
+
+    protected SynchronizedStack<Nio2Channel> getNioChannels() {
+        return nioChannels;
+    }
+
+
+    @Override
+    protected NetworkChannel getServerSocket() {
+        return serverSock;
+    }
+
+
+    @Override
+    protected AsynchronousSocketChannel serverSocketAccept() throws Exception {
+        AsynchronousSocketChannel result = serverSock.accept().get();
+
+        // Bug does not affect Windows. Skip the check on that platform.
+        if (!JrePlatform.IS_WINDOWS) {
+            SocketAddress currentRemoteAddress = result.getRemoteAddress();
+            long currentNanoTime = System.nanoTime();
+            if (currentRemoteAddress.equals(previousAcceptedSocketRemoteAddress) &&
+                    currentNanoTime - previousAcceptedSocketNanoTime < 1000) {
+                throw new IOException(sm.getString("endpoint.err.duplicateAccept"));
             }
+            previousAcceptedSocketRemoteAddress = currentRemoteAddress;
+            previousAcceptedSocketNanoTime = currentNanoTime;
+        }
+
+        return result;
+    }
+
 
     @Override
     protected Log getLog() {
         return log;
-        }
+    }
 
 
     @Override
-    protected NetworkChannel getServerSocket() {
-        return serverSock;
+    protected SocketProcessorBase<Nio2Channel> createSocketProcessor(
+            SocketWrapperBase<Nio2Channel> socketWrapper, SocketEvent event) {
+        return new SocketProcessor(socketWrapper, event);
     }
 
 
-    // --------------------------------------------------- Acceptor Inner Class
-    /**
-     * With NIO2, the main acceptor thread only initiates the initial accept
-     * but periodically checks that the connector is still accepting (if not
-     * it will attempt to start again).
-     */
-    protected class Acceptor extends AbstractEndpoint.Acceptor {
+    protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
+        implements CompletionHandler<AsynchronousSocketChannel, Void> {
+
+        protected int errorDelay = 0;
+
+        public Nio2Acceptor(AbstractEndpoint<?, AsynchronousSocketChannel> endpoint) {
+            super(endpoint);
+        }
 
         @Override
         public void run() {
-
-            int errorDelay = 0;
-            long pauseStart = 0;
-
-            // Loop until we receive a shutdown command
-            while (running) {
-
-                // Loop if endpoint is paused.
-                // There are two likely scenarios here.
-                // The first scenario is that Tomcat is shutting down. In this
-                // case - and particularly for the unit tests - we want to exit
-                // this loop as quickly as possible. The second scenario is a
-                // genuine pause of the connector. In this case we want to avoid
-                // excessive CPU usage.
-                // Therefore, we start with a tight loop but if there isn't a
-                // rapid transition to stop then sleeps are introduced.
-                // < 1ms       - tight loop
-                // 1ms to 10ms - 1ms sleep
-                // > 10ms      - 10ms sleep
-                while (paused && running) {
-                    if (state != AcceptorState.PAUSED) {
-                        pauseStart = System.nanoTime();
-                        // Entered pause state
-                        state = AcceptorState.PAUSED;
-                    }
-                    if ((System.nanoTime() - pauseStart) > 1_000_000) {
-                        // Paused for more than 1ms
-                        try {
-                            if ((System.nanoTime() - pauseStart) > 10_000_000) {
-                                Thread.sleep(10);
-                            } else {
-                                Thread.sleep(1);
-                            }
-                        } catch (InterruptedException e) {
-                            // Ignore
-                        }
-                    }
+            // The initial accept will be called in a separate utility thread
+            if (!isPaused()) {
+                //if we have reached max connections, wait
+                try {
+                    countUpOrAwaitConnection();
+                } catch (InterruptedException e) {
+                    // Ignore
                 }
-
-                if (!running) {
-                    break;
+                if (!isPaused()) {
+                    // Note: as a special behavior, the completion handler for accept is
+                    // always called in a separate thread.
+                    serverSock.accept(null, this);
+                } else {
+                    state = AcceptorState.PAUSED;
                 }
-                state = AcceptorState.RUNNING;
+            } else {
+                state = AcceptorState.PAUSED;
+            }
+        }
 
-                try {
-                    //if we have reached max connections, wait
-                    countUpOrAwaitConnection();
+        /**
+         * Signals the Acceptor to stop.
+         *
+         * @param waitSeconds Ignored for NIO2.
+         *
+         */
+        @Override
+        public void stop(int waitSeconds) {
+            acceptor.state = AcceptorState.ENDED;
+        }
 
-                    AsynchronousSocketChannel socket = null;
+        @Override
+        public void completed(AsynchronousSocketChannel socket,
+                Void attachment) {
+            // Successful accept, reset the error delay
+            errorDelay = 0;
+            // Continue processing the socket on the current thread
+            // Configure the socket
+            if (isRunning() && !isPaused()) {
+                if (getMaxConnections() == -1) {
+                    serverSock.accept(null, this);
+                } else if (getConnectionCount() < getMaxConnections()) {
                     try {
-                        // Accept the next incoming connection from the server
-                        // socket
-                        socket = serverSock.accept().get();
-
-                        // Bug does not affect Windows. Skip the check on that platform.
-                        if (!JrePlatform.IS_WINDOWS) {
-                            SocketAddress currentRemoteAddress = socket.getRemoteAddress();
-                            long currentNanoTime = System.nanoTime();
-                            if (currentRemoteAddress.equals(previousAcceptedSocketRemoteAddress) &&
-                                    currentNanoTime - previousAcceptedSocketNanoTime < 1000) {
-                                throw new IOException(sm.getString("endpoint.err.duplicateAccept"));
-                            }
-                            previousAcceptedSocketRemoteAddress = currentRemoteAddress;
-                            previousAcceptedSocketNanoTime = currentNanoTime;
-                        }
-                    } catch (Exception e) {
-                        // We didn't get a socket
-                        countDownConnection();
-                        if (running) {
-                            // Introduce delay if necessary
-                            errorDelay = handleExceptionWithDelay(errorDelay);
-                            // re-throw
-                            throw e;
-                        } else {
-                            break;
-                        }
-                    }
-                    // Successful accept, reset the error delay
-                    errorDelay = 0;
-
-                    // Configure the socket
-                    if (running && !paused) {
-                        // setSocketOptions() will hand the socket off to
-                        // an appropriate processor if successful
-                        if (!setSocketOptions(socket)) {
-                            closeSocket(socket);
-                       }
-                    } else {
-                        closeSocket(socket);
+                        // This will not block
+                        countUpOrAwaitConnection();
+                    } catch (InterruptedException e) {
+                        // Ignore
                     }
-                } catch (Throwable t) {
-                    ExceptionUtils.handleThrowable(t);
-                    log.error(sm.getString("endpoint.accept.fail"), t);
+                    serverSock.accept(null, this);
+                } else {
+                    // Accept again on a new thread since countUpOrAwaitConnection may block
+                    getExecutor().execute(this);
+                }
+                if (!setSocketOptions(socket)) {
+                    closeSocket(socket);
+                }
+            } else {
+                if (isRunning()) {
+                    state = AcceptorState.PAUSED;
                 }
+                destroySocket(socket);
             }
-            state = AcceptorState.ENDED;
         }
 
-
-        private void closeSocket(AsynchronousSocketChannel socket) {
-            countDownConnection();
-            try {
-                socket.close();
-            } catch (IOException ioe) {
-                if (log.isDebugEnabled()) {
-                    log.debug(sm.getString("endpoint.err.close"), ioe);
-                }
+        @Override
+        public void failed(Throwable t, Void attachment) {
+            if (isRunning()) {
+                if (!isPaused()) {
+                    if (getMaxConnections() == -1) {
+                        serverSock.accept(null, this);
+                    } else {
+                        // Accept again on a new thread since countUpOrAwaitConnection may block
+                        getExecutor().execute(this);
+                    }
+                } else {
+                    state = AcceptorState.PAUSED;
+                }
+                // We didn't get a socket
+                countDownConnection();
+                // Introduce delay if necessary
+                errorDelay = handleExceptionWithDelay(errorDelay);
+                ExceptionUtils.handleThrowable(t);
+                log.error(sm.getString("endpoint.accept.fail"), t);
+            } else {
+                // We didn't get a socket
+                countDownConnection();
             }
         }
+
     }
 
     public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> {
@@ -492,13 +516,11 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
         private SendfileData sendfileData = null;
 
         private final CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
-        private final Semaphore readPending = new Semaphore(1);
         private boolean readInterest = false; // Guarded by readCompletionHandler
         private boolean readNotify = false;
 
         private final CompletionHandler<Integer, ByteBuffer> writeCompletionHandler;
         private final CompletionHandler<Long, ByteBuffer[]> gatheringWriteCompletionHandler;
-        private final Semaphore writePending = new Semaphore(1);
         private boolean writeInterest = false; // Guarded by writeCompletionHandler
         private boolean writeNotify = false;
 
@@ -586,7 +608,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
 
         public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) {
             super(channel, endpoint);
-            nioChannels = endpoint.nioChannels;
+            nioChannels = endpoint.getNioChannels();
             socketBufferHandler = channel.getBufHandler();
 
             this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
@@ -928,24 +950,14 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
         @Override
         protected void doClose() {
             if (log.isDebugEnabled()) {
-                log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])", new Exception());
+                log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])");
             }
             try {
-                getEndpoint().getHandler().release(this);
-            } catch (Throwable e) {
-                ExceptionUtils.handleThrowable(e);
-                if (log.isDebugEnabled()) {
-                    log.error("Channel close error", e);
-                }
-            }
-            try {
-                synchronized (getSocket()) {
-                    getEndpoint().countDownConnection();
-                    if (getSocket().isOpen()) {
-                        getSocket().close(true);
-                    }
+                getEndpoint().connections.remove(getSocket().getIOChannel());
+                if (getSocket().isOpen()) {
+                    getSocket().close(true);
                 }
-                if (getEndpoint().running && !getEndpoint().paused) {
+                if (getEndpoint().running) {
                     if (nioChannels == null || !nioChannels.push(getSocket())) {
                         getSocket().free();
                     }
diff --git a/java/org/apache/tomcat/util/net/NioBlockingSelector.java b/java/org/apache/tomcat/util/net/NioBlockingSelector.java
deleted file mode 100644
index 3a629d3..0000000
--- a/java/org/apache/tomcat/util/net/NioBlockingSelector.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.util.net;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-import org.apache.tomcat.util.ExceptionUtils;
-import org.apache.tomcat.util.collections.SynchronizedQueue;
-import org.apache.tomcat.util.collections.SynchronizedStack;
-import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper;
-
-public class NioBlockingSelector {
-
-    private static final Log log = LogFactory.getLog(NioBlockingSelector.class);
-
-    private static AtomicInteger threadCounter = new AtomicInteger(0);
-
-    private final SynchronizedStack<KeyReference> keyReferenceStack =
-            new SynchronizedStack<>();
-
-    protected Selector sharedSelector;
-
-    protected BlockPoller poller;
-
-    public void open(Selector selector) {
-        sharedSelector = selector;
-        poller = new BlockPoller();
-        poller.selector = sharedSelector;
-        poller.setDaemon(true);
-        poller.setName("NioBlockingSelector.BlockPoller-"+(threadCounter.getAndIncrement()));
-        poller.start();
-    }
-
-    public void close() {
-        if (poller != null) {
-            poller.disable();
-            poller.interrupt();
-            poller = null;
-        }
-    }
-
-    /**
-     * Performs a blocking write using the bytebuffer for data to be written
-     * If the <code>selector</code> parameter is null, then it will perform a busy write that could
-     * take up a lot of CPU cycles.
-     *
-     * @param buf ByteBuffer - the buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
-     * @param socket SocketChannel - the socket to write data to
-     * @param writeTimeout long - the timeout for this write operation in milliseconds, -1 means no timeout
-     * @return the number of bytes written
-     * @throws EOFException if write returns -1
-     * @throws SocketTimeoutException if the write times out
-     * @throws IOException if an IO Exception occurs in the underlying socket logic
-     */
-    public int write(ByteBuffer buf, NioChannel socket, long writeTimeout)
-            throws IOException {
-        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-        if (key == null) {
-            throw new IOException("Key no longer registered");
-        }
-        KeyReference reference = keyReferenceStack.pop();
-        if (reference == null) {
-            reference = new KeyReference();
-        }
-        NioSocketWrapper att = (NioSocketWrapper) key.attachment();
-        if (att == null) {
-            throw new IOException();
-        }
-        if (att.previousIOException != null) {
-            /*
-             * Socket has previously seen an IOException on write.
-             *
-             * Blocking writes assume that buffer is always fully written so
-             * there is no code checking for incomplete writes, retaining
-             * the unwritten data and attempting to write it as part of a
-             * subsequent write call.
-             *
-             * Because of the above, when an IOException is triggered we
-             * need so skip subsequent attempts to write as otherwise it
-             * will appear to the client as if some data was dropped just
-             * before the connection is lost. It is better if the client
-             * just sees the dropped connection.
-             */
-            throw new IOException(att.previousIOException);
-        }
-        int written = 0;
-        boolean timedout = false;
-        int keycount = 1; //assume we can write
-        long time = System.currentTimeMillis(); //start the timeout timer
-        try {
-            while (!timedout && buf.hasRemaining()) {
-                if (keycount > 0) { //only write if we were registered for a write
-                    int cnt = socket.write(buf); //write the data
-                    if (cnt == -1) {
-                        throw new EOFException();
-                    }
-                    written += cnt;
-                    if (cnt > 0) {
-                        time = System.currentTimeMillis(); //reset our timeout timer
-                        continue; //we successfully wrote, try again without a selector
-                    }
-                }
-                try {
-                    if (att.getWriteLatch() == null || att.getWriteLatch().getCount() == 0) {
-                        att.startWriteLatch(1);
-                    }
-                    poller.add(att, SelectionKey.OP_WRITE, reference);
-                    if (writeTimeout < 0) {
-                        att.awaitWriteLatch(Long.MAX_VALUE,TimeUnit.MILLISECONDS);
-                    } else {
-                        att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
-                    }
-                } catch (InterruptedException ignore) {
-                    // Ignore
-                }
-                if (att.getWriteLatch() != null && att.getWriteLatch().getCount() > 0) {
-                    //we got interrupted, but we haven't received notification from the poller.
-                    keycount = 0;
-                } else {
-                    //latch countdown has happened
-                    keycount = 1;
-                    att.resetWriteLatch();
-                }
-
-                if (writeTimeout > 0 && (keycount == 0)) {
-                    timedout = (System.currentTimeMillis() - time) >= writeTimeout;
-                }
-            }
-            if (timedout) {
-                att.previousIOException = new SocketTimeoutException();
-                throw att.previousIOException;
-            }
-        } finally {
-            poller.remove(att, SelectionKey.OP_WRITE);
-            if (timedout && reference.key != null) {
-                poller.cancelKey(reference.key);
-            }
-            reference.key = null;
-            keyReferenceStack.push(reference);
-        }
-        return written;
-    }
-
-    /**
-     * Performs a blocking read using the bytebuffer for data to be read
-     * If the <code>selector</code> parameter is null, then it will perform a busy read that could
-     * take up a lot of CPU cycles.
-     *
-     * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
-     * @param socket SocketChannel - the socket to write data to
-     * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
-     * @return the number of bytes read
-     * @throws EOFException if read returns -1
-     * @throws SocketTimeoutException if the read times out
-     * @throws IOException if an IO Exception occurs in the underlying socket logic
-     */
-    public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
-        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-        if (key == null) {
-            throw new IOException("Key no longer registered");
-        }
-        KeyReference reference = keyReferenceStack.pop();
-        if (reference == null) {
-            reference = new KeyReference();
-        }
-        NioSocketWrapper att = (NioSocketWrapper) key.attachment();
-        int read = 0;
-        boolean timedout = false;
-        int keycount = 1; //assume we can read
-        long time = System.currentTimeMillis(); //start the timeout timer
-        try {
-            while (!timedout) {
-                if (keycount > 0) { //only read if we were registered for a read
-                    read = socket.read(buf);
-                    if (read != 0) {
-                        break;
-                    }
-                }
-                try {
-                    if (att.getReadLatch()==null || att.getReadLatch().getCount()==0) {
-                        att.startReadLatch(1);
-                    }
-                    poller.add(att,SelectionKey.OP_READ, reference);
-                    if (readTimeout < 0) {
-                        att.awaitReadLatch(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-                    } else {
-                        att.awaitReadLatch(readTimeout, TimeUnit.MILLISECONDS);
-                    }
-                } catch (InterruptedException ignore) {
-                    // Ignore
-                }
-                if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) {
-                    //we got interrupted, but we haven't received notification from the poller.
-                    keycount = 0;
-                }else {
-                    //latch countdown has happened
-                    keycount = 1;
-                    att.resetReadLatch();
-                }
-                if (readTimeout >= 0 && (keycount == 0)) {
-                    timedout = (System.currentTimeMillis() - time) >= readTimeout;
-                }
-            }
-            if (timedout) {
-                throw new SocketTimeoutException();
-            }
-        } finally {
-            poller.remove(att,SelectionKey.OP_READ);
-            if (timedout && reference.key != null) {
-                poller.cancelKey(reference.key);
-            }
-            reference.key = null;
-            keyReferenceStack.push(reference);
-        }
-        return read;
-    }
-
-
-    protected static class BlockPoller extends Thread {
-        protected volatile boolean run = true;
-        protected Selector selector = null;
-        protected final SynchronizedQueue<Runnable> events = new SynchronizedQueue<>();
-        public void disable() { run = false; selector.wakeup();}
-        protected final AtomicInteger wakeupCounter = new AtomicInteger(0);
-
-        public void cancelKey(final SelectionKey key) {
-            Runnable r = new RunnableCancel(key);
-            events.offer(r);
-            wakeup();
-        }
-
-        public void wakeup() {
-            if (wakeupCounter.addAndGet(1)==0) {
-                selector.wakeup();
-            }
-        }
-
-        public void cancel(SelectionKey sk, NioSocketWrapper key, int ops){
-            if (sk!=null) {
-                sk.cancel();
-                sk.attach(null);
-                if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) {
-                    countDown(key.getWriteLatch());
-                }
-                if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ)) {
-                    countDown(key.getReadLatch());
-                }
-            }
-        }
-
-        public void add(final NioSocketWrapper key, final int ops, final KeyReference ref) {
-            if ( key == null ) {
-                return;
-            }
-            NioChannel nch = key.getSocket();
-            final SocketChannel ch = nch.getIOChannel();
-            if ( ch == null ) {
-                return;
-            }
-
-            Runnable r = new RunnableAdd(ch, key, ops, ref);
-            events.offer(r);
-            wakeup();
-        }
-
-        public void remove(final NioSocketWrapper key, final int ops) {
-            if ( key == null ) {
-                return;
-            }
-            NioChannel nch = key.getSocket();
-            final SocketChannel ch = nch.getIOChannel();
-            if ( ch == null ) {
-                return;
-            }
-
-            Runnable r = new RunnableRemove(ch, key, ops);
-            events.offer(r);
-            wakeup();
-        }
-
-        public boolean events() {
-            Runnable r = null;
-
-            /* We only poll and run the runnable events when we start this
-             * method. Further events added to the queue later will be delayed
-             * to the next execution of this method.
-             *
-             * We do in this way, because running event from the events queue
-             * may lead the working thread to add more events to the queue (for
-             * example, the worker thread may add another RunnableAdd event when
-             * waken up by a previous RunnableAdd event who got an invalid
-             * SelectionKey). Trying to consume all the events in an increasing
-             * queue till it's empty, will make the loop hard to be terminated,
-             * which will kill a lot of time, and greatly affect performance of
-             * the poller loop.
-             */
-            int size = events.size();
-            for (int i = 0; i < size && (r = events.poll()) != null; i++) {
-                r.run();
-            }
-
-            return (size > 0);
-        }
-
-        @Override
-        public void run() {
-            while (run) {
-                try {
-                    events();
-                    int keyCount = 0;
-                    try {
-                        if (wakeupCounter.getAndSet(-1) > 0) {
-                            keyCount = selector.selectNow();
-                        } else {
-                            keyCount = selector.select(1000);
-                        }
-                        wakeupCounter.set(0);
-                        if (!run) {
-                            break;
-                        }
-                    }catch ( NullPointerException x ) {
-                        //sun bug 5076772 on windows JDK 1.5
-                        if (selector==null) {
-                            throw x;
-                        }
-                        if ( log.isDebugEnabled() ) {
-                            log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
-                        }
-                        continue;
-                    } catch ( CancelledKeyException x ) {
-                        //sun bug 5076772 on windows JDK 1.5
-                        if ( log.isDebugEnabled() ) {
-                            log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
-                        }
-                        continue;
-                    } catch (Throwable x) {
-                        ExceptionUtils.handleThrowable(x);
-                        log.error("",x);
-                        continue;
-                    }
-
-                    Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
-
-                    // Walk through the collection of ready keys and dispatch
-                    // any active event.
-                    while (run && iterator != null && iterator.hasNext()) {
-                        SelectionKey sk = iterator.next();
-                        NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
-                        try {
-                            iterator.remove();
-                            sk.interestOps(sk.interestOps() & (~sk.readyOps()));
-                            if ( sk.isReadable() ) {
-                                countDown(attachment.getReadLatch());
-                            }
-                            if (sk.isWritable()) {
-                                countDown(attachment.getWriteLatch());
-                            }
-                        }catch (CancelledKeyException ckx) {
-                            sk.cancel();
-                            countDown(attachment.getReadLatch());
-                            countDown(attachment.getWriteLatch());
-                        }
-                    }//while
-                }catch ( Throwable t ) {
-                    log.error("",t);
-                }
-            }
-            events.clear();
-            // If using a shared selector, the NioSelectorPool will also try and
-            // close the selector. Try and avoid the ClosedSelectorException
-            // although because multiple threads are involved there is always
-            // the possibility of an Exception here.
-            if (selector.isOpen()) {
-                try {
-                    // Cancels all remaining keys
-                    selector.selectNow();
-                }catch( Exception ignore ) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("",ignore);
-                    }
-                }
-            }
-            try {
-                selector.close();
-            }catch( Exception ignore ) {
-                if (log.isDebugEnabled()) {
-                    log.debug("",ignore);
-                }
-            }
-        }
-
-        public void countDown(CountDownLatch latch) {
-            if ( latch == null ) {
-                return;
-            }
-            latch.countDown();
-        }
-
-
-        private class RunnableAdd implements Runnable {
-
-            private final SocketChannel ch;
-            private final NioSocketWrapper key;
-            private final int ops;
-            private final KeyReference ref;
-
-            public RunnableAdd(SocketChannel ch, NioSocketWrapper key, int ops, KeyReference ref) {
-                this.ch = ch;
-                this.key = key;
-                this.ops = ops;
-                this.ref = ref;
-            }
-
-            @Override
-            public void run() {
-                SelectionKey sk = ch.keyFor(selector);
-                try {
-                    if (sk == null) {
-                        sk = ch.register(selector, ops, key);
-                        ref.key = sk;
-                    } else if (!sk.isValid()) {
-                        cancel(sk, key, ops);
-                    } else {
-                        sk.interestOps(sk.interestOps() | ops);
-                    }
-                } catch (CancelledKeyException cx) {
-                    cancel(sk, key, ops);
-                } catch (ClosedChannelException cx) {
-                    cancel(null, key, ops);
-                }
-            }
-        }
-
-
-        private class RunnableRemove implements Runnable {
-
-            private final SocketChannel ch;
-            private final NioSocketWrapper key;
-            private final int ops;
-
-            public RunnableRemove(SocketChannel ch, NioSocketWrapper key, int ops) {
-                this.ch = ch;
-                this.key = key;
-                this.ops = ops;
-            }
-
-            @Override
-            public void run() {
-                SelectionKey sk = ch.keyFor(selector);
-                try {
-                    if (sk == null) {
-                        if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) {
-                            countDown(key.getWriteLatch());
-                        }
-                        if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ)) {
-                            countDown(key.getReadLatch());
-                        }
-                    } else {
-                        if (sk.isValid()) {
-                            sk.interestOps(sk.interestOps() & (~ops));
-                            if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) {
-                                countDown(key.getWriteLatch());
-                            }
-                            if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ)) {
-                                countDown(key.getReadLatch());
-                            }
-                            if (sk.interestOps()==0) {
-                                sk.cancel();
-                                sk.attach(null);
-                            }
-                        }else {
-                            sk.cancel();
-                            sk.attach(null);
-                        }
-                    }
-                }catch (CancelledKeyException cx) {
-                    if (sk!=null) {
-                        sk.cancel();
-                        sk.attach(null);
-                    }
-                }
-            }
-
-        }
-
-
-        public static class RunnableCancel implements Runnable {
-
-            private final SelectionKey key;
-
-            public RunnableCancel(SelectionKey key) {
-                this.key = key;
-            }
-
-            @Override
-            public void run() {
-                key.cancel();
-            }
-        }
-    }
-
-
-    public static class KeyReference {
-        SelectionKey key = null;
-
-        @Override
-        public void finalize() {
-            if (key!=null && key.isValid()) {
-                log.warn("Possible key leak, cancelling key in the finalizer.");
-                try {key.cancel();}catch (Exception ignore){}
-            }
-        }
-    }
-}
diff --git a/java/org/apache/tomcat/util/net/NioChannel.java b/java/org/apache/tomcat/util/net/NioChannel.java
index b718e39..ac46d76 100644
--- a/java/org/apache/tomcat/util/net/NioChannel.java
+++ b/java/org/apache/tomcat/util/net/NioChannel.java
@@ -21,12 +21,10 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
-import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 
 import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper;
-import org.apache.tomcat.util.net.NioEndpoint.Poller;
 import org.apache.tomcat.util.res.StringManager;
 
 /**
@@ -42,10 +40,7 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
 
     protected final SocketBufferHandler bufHandler;
     protected SocketChannel sc = null;
-    protected SocketWrapperBase<NioChannel> socketWrapper = null;
-
-
-    protected Poller poller;
+    protected NioSocketWrapper socketWrapper = null;
 
     public NioChannel(SocketBufferHandler bufHandler) {
         this.bufHandler = bufHandler;
@@ -67,14 +62,10 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
     /**
      * @return the socketWrapper
      */
-    SocketWrapperBase<NioChannel> getSocketWrapper() {
+    NioSocketWrapper getSocketWrapper() {
         return socketWrapper;
     }
 
-    void setSocketWrapper(SocketWrapperBase<NioChannel> socketWrapper) {
-        this.socketWrapper = socketWrapper;
-    }
-
     /**
      * Free the channel memory
      */
@@ -180,22 +171,10 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
         return sc.read(dsts, offset, length);
     }
 
-    public Object getAttachment() {
-        Poller pol = getPoller();
-        Selector sel = pol!=null?pol.getSelector():null;
-        SelectionKey key = sel!=null?getIOChannel().keyFor(sel):null;
-        Object att = key!=null?key.attachment():null;
-        return att;
-    }
-
     public SocketBufferHandler getBufHandler() {
         return bufHandler;
     }
 
-    public Poller getPoller() {
-        return poller;
-    }
-
     public SocketChannel getIOChannel() {
         return sc;
     }
@@ -221,10 +200,6 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
         return 0;
     }
 
-    public void setPoller(Poller poller) {
-        this.poller = poller;
-    }
-
     @Override
     public String toString() {
         return super.toString() + ":" + sc.toString();
@@ -285,6 +260,10 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
         public void free() {
         }
         @Override
+        protected ApplicationBufferHandler getAppReadBufHandler() {
+            return ApplicationBufferHandler.EMPTY;
+        }
+        @Override
         public void setAppReadBufHandler(ApplicationBufferHandler handler) {
         }
         @Override
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java
index 3e929d2..8c8fdf2 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -22,7 +22,6 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
@@ -49,7 +48,6 @@ import javax.net.ssl.SSLEngine;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.ExceptionUtils;
-import org.apache.tomcat.util.IntrospectionUtils;
 import org.apache.tomcat.util.collections.SynchronizedQueue;
 import org.apache.tomcat.util.collections.SynchronizedStack;
 import org.apache.tomcat.util.compat.JreCompat;
@@ -83,8 +81,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
 
     // ----------------------------------------------------------------- Fields
 
-    private NioSelectorPool selectorPool = new NioSelectorPool();
-
     /**
      * Server socket "pointer".
      */
@@ -112,31 +108,13 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
     // ------------------------------------------------------------- Properties
 
     /**
-     * Generic properties, introspected
-     */
-    @Override
-    public boolean setProperty(String name, String value) {
-        final String selectorPoolName = "selectorPool.";
-        try {
-            if (name.startsWith(selectorPoolName)) {
-                return IntrospectionUtils.setProperty(selectorPool, name.substring(selectorPoolName.length()), value);
-            } else {
-                return super.setProperty(name, value);
-            }
-        }catch ( Exception x ) {
-            log.error("Unable to set attribute \""+name+"\" to \""+value+"\"",x);
-            return false;
-        }
-    }
-
-
-    /**
      * Use System.inheritableChannel to obtain channel from stdin/stdout.
      */
     private boolean useInheritedChannel = false;
     public void setUseInheritedChannel(boolean useInheritedChannel) { this.useInheritedChannel = useInheritedChannel; }
     public boolean getUseInheritedChannel() { return useInheritedChannel; }
 
+
     /**
      * Priority of the poller threads.
      */
@@ -185,14 +163,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
     }
 
 
-    public void setSelectorPool(NioSelectorPool selectorPool) {
-        this.selectorPool = selectorPool;
-    }
-
-    public void setSocketProperties(SocketProperties socketProperties) {
-        this.socketProperties = socketProperties;
-    }
-
     /**
      * Is deferAccept supported?
      */
@@ -233,8 +203,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
 
         // Initialize SSL if needed
         initialiseSsl();
-
-        selectorPool.open();
     }
 
     // Separated out to make it easier for folks that extend NioEndpoint to
@@ -296,7 +264,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
             pollerThread.setDaemon(true);
             pollerThread.start();
 
-            startAcceptorThreads();
+            startAcceptorThread();
         }
     }
 
@@ -306,13 +274,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
      */
     @Override
     public void stopInternal() {
-        releaseConnectionLatch();
         if (!paused) {
             pause();
         }
         if (running) {
             running = false;
-            unlockAccept();
+            acceptor.stop(10);
             if (poller != null) {
                 poller.destroy();
                 poller = null;
@@ -366,7 +333,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
         if (getHandler() != null ) {
             getHandler().recycle();
         }
-        selectorPool.close();
         if (log.isDebugEnabled()) {
             log.debug("Destroy completed for " +
                     new InetSocketAddress(getAddress(), getPort()));
@@ -378,7 +344,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
     protected void doCloseServerSocket() throws IOException {
         if (!getUseInheritedChannel() && serverSock != null) {
             // Close server socket
-            serverSock.socket().close();
             serverSock.close();
         }
         serverSock = null;
@@ -388,22 +353,13 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
     // ------------------------------------------------------ Protected Methods
 
 
-    public int getWriteBufSize() {
-        return socketProperties.getTxBufSize();
-    }
-
-    public int getReadBufSize() {
-        return socketProperties.getRxBufSize();
-    }
-
-    public NioSelectorPool getSelectorPool() {
-        return selectorPool;
+    protected SynchronizedStack<NioChannel> getNioChannels() {
+        return nioChannels;
     }
 
 
-    @Override
-    protected AbstractEndpoint.Acceptor createAcceptor() {
-        return new Acceptor();
+    protected Poller getPoller() {
+        return poller;
     }
 
 
@@ -424,14 +380,11 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
      *  and processing may continue, <code>false</code> if the socket needs to be
      *  close immediately
      */
+    @Override
     protected boolean setSocketOptions(SocketChannel socket) {
-        // Process the connection
+        NioSocketWrapper socketWrapper = null;
         try {
-            //disable blocking, APR style, we are gonna be polling it
-            socket.configureBlocking(false);
-            Socket sock = socket.socket();
-            socketProperties.setProperties(sock);
-
+            // Allocate channel and wrapper
             NioChannel channel = null;
             if (nioChannels != null) {
                 channel = nioChannels.pop();
@@ -442,12 +395,25 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                         socketProperties.getAppWriteBufSize(),
                         socketProperties.getDirectBuffer());
                 if (isSSLEnabled()) {
-                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
+                    channel = new SecureNioChannel(bufhandler, this);
                 } else {
                     channel = new NioChannel(bufhandler);
                 }
             }
-            poller.register(channel);
+            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
+            channel.reset(socket, newWrapper);
+            connections.put(socket, newWrapper);
+            socketWrapper = newWrapper;
+
+            // Set socket properties
+            // Disable blocking, polling will be used
+            socket.configureBlocking(false);
+            socketProperties.setProperties(socket.socket());
+
+            socketWrapper.setReadTimeout(getConnectionTimeout());
+            socketWrapper.setWriteTimeout(getConnectionTimeout());
+            socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
+            poller.register(socketWrapper);
             return true;
         } catch (Throwable t) {
             ExceptionUtils.handleThrowable(t);
@@ -456,6 +422,9 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
             } catch (Throwable tt) {
                 ExceptionUtils.handleThrowable(tt);
             }
+            if (socketWrapper == null) {
+                destroySocket(socket);
+            }
         }
         // Tell to close the socket if needed
         return false;
@@ -463,8 +432,15 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
 
 
     @Override
-    protected Log getLog() {
-        return log;
+    protected void destroySocket(SocketChannel socket) {
+        countDownConnection();
+        try {
+            socket.close();
+        } catch (IOException ioe) {
+            if (log.isDebugEnabled()) {
+                log.debug(sm.getString("endpoint.err.close"), ioe);
+            }
+        }
     }
 
 
@@ -474,131 +450,29 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
     }
 
 
-    // --------------------------------------------------- Acceptor Inner Class
-    /**
-     * The background thread that listens for incoming TCP/IP connections and
-     * hands them off to an appropriate processor.
-     */
-    protected class Acceptor extends AbstractEndpoint.Acceptor {
-
-        @Override
-        public void run() {
-
-            int errorDelay = 0;
-            long pauseStart = 0;
-
-            // Loop until we receive a shutdown command
-            while (running) {
-
-                // Loop if endpoint is paused.
-                // There are two likely scenarios here.
-                // The first scenario is that Tomcat is shutting down. In this
-                // case - and particularly for the unit tests - we want to exit
-                // this loop as quickly as possible. The second scenario is a
-                // genuine pause of the connector. In this case we want to avoid
-                // excessive CPU usage.
-                // Therefore, we start with a tight loop but if there isn't a
-                // rapid transition to stop then sleeps are introduced.
-                // < 1ms       - tight loop
-                // 1ms to 10ms - 1ms sleep
-                // > 10ms      - 10ms sleep
-                 while (paused && running) {
-                     if (state != AcceptorState.PAUSED) {
-                         pauseStart = System.nanoTime();
-                         // Entered pause state
-                         state = AcceptorState.PAUSED;
-                     }
-                     if ((System.nanoTime() - pauseStart) > 1_000_000) {
-                         // Paused for more than 1ms
-                         try {
-                             if ((System.nanoTime() - pauseStart) > 10_000_000) {
-                                 Thread.sleep(10);
-                             } else {
-                                 Thread.sleep(1);
-                             }
-                         } catch (InterruptedException e) {
-                             // Ignore
-                         }
-                     }
-                }
-
-                if (!running) {
-                    break;
-                }
-                state = AcceptorState.RUNNING;
-
-                try {
-                    //if we have reached max connections, wait
-                    countUpOrAwaitConnection();
+    @Override
+    protected SocketChannel serverSocketAccept() throws Exception {
+        SocketChannel result = serverSock.accept();
 
-                    SocketChannel socket = null;
-                    try {
-                        // Accept the next incoming connection from the server
-                        // socket
-                        socket = serverSock.accept();
-
-                        // Bug does not affect Windows. Skip the check on that platform.
-                        if (!JrePlatform.IS_WINDOWS) {
-                            SocketAddress currentRemoteAddress = socket.getRemoteAddress();
-                            long currentNanoTime = System.nanoTime();
-                            if (currentRemoteAddress.equals(previousAcceptedSocketRemoteAddress) &&
-                                    currentNanoTime - previousAcceptedSocketNanoTime < 1000) {
-                                throw new IOException(sm.getString("endpoint.err.duplicateAccept"));
-                            }
-                            previousAcceptedSocketRemoteAddress = currentRemoteAddress;
-                            previousAcceptedSocketNanoTime = currentNanoTime;
-                        }
-                    } catch (IOException ioe) {
-                        // We didn't get a socket
-                        countDownConnection();
-                        if (running) {
-                            // Introduce delay if necessary
-                            errorDelay = handleExceptionWithDelay(errorDelay);
-                            // re-throw
-                            throw ioe;
-                        } else {
-                            break;
-                        }
-                    }
-                    // Successful accept, reset the error delay
-                    errorDelay = 0;
-
-                    // Configure the socket
-                    if (running && !paused) {
-                        // setSocketOptions() will hand the socket off to
-                        // an appropriate processor if successful
-                        if (!setSocketOptions(socket)) {
-                            closeSocket(socket);
-                        }
-                    } else {
-                        closeSocket(socket);
-                    }
-                } catch (Throwable t) {
-                    ExceptionUtils.handleThrowable(t);
-                    log.error(sm.getString("endpoint.accept.fail"), t);
-                }
+        // Bug does not affect Windows. Skip the check on that platform.
+        if (!JrePlatform.IS_WINDOWS) {
+            SocketAddress currentRemoteAddress = result.getRemoteAddress();
+            long currentNanoTime = System.nanoTime();
+            if (currentRemoteAddress.equals(previousAcceptedSocketRemoteAddress) &&
+                    currentNanoTime - previousAcceptedSocketNanoTime < 1000) {
+                throw new IOException(sm.getString("endpoint.err.duplicateAccept"));
             }
-            state = AcceptorState.ENDED;
+            previousAcceptedSocketRemoteAddress = currentRemoteAddress;
+            previousAcceptedSocketNanoTime = currentNanoTime;
         }
 
+        return result;
+    }
 
-        private void closeSocket(SocketChannel socket) {
-            countDownConnection();
-            try {
-                socket.socket().close();
-            } catch (IOException ioe)  {
-                if (log.isDebugEnabled()) {
-                    log.debug(sm.getString("endpoint.err.close"), ioe);
-                }
-            }
-            try {
-                socket.close();
-            } catch (IOException ioe) {
-                if (log.isDebugEnabled()) {
-                    log.debug(sm.getString("endpoint.err.close"), ioe);
-                }
-            }
-        }
+
+    @Override
+    protected Log getLog() {
+        return log;
     }
 
 
@@ -730,7 +604,10 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                 NioSocketWrapper socketWrapper = pe.getSocketWrapper();
                 SocketChannel sc = socketWrapper.getSocket().getIOChannel();
                 int interestOps = pe.getInterestOps();
-                if (interestOps == OP_REGISTER) {
+                if (sc == null) {
+                    log.warn(sm.getString("endpoint.nio.nullSocketChannel"));
+                    socketWrapper.close();
+                } else if (interestOps == OP_REGISTER) {
                     try {
                         sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);
                     } catch (Exception x) {
@@ -744,8 +621,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                         // processed. Count down the connections at this point
                         // since it won't have been counted down when the socket
                         // closed.
-                        socketWrapper.getEndpoint().countDownConnection();
-                        ((NioSocketWrapper) socketWrapper).closed.set(true);
+                        socketWrapper.close();
                     } else {
                         final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();
                         if (attachment != null) {
@@ -774,51 +650,45 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
         /**
          * Registers a newly created socket with the poller.
          *
-         * @param socket    The newly created socket
+         * @param socketWrapper The socket wrapper
          */
-        public void register(final NioChannel socket) {
-            socket.setPoller(this);
-            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
-            socket.setSocketWrapper(ka);
-            ka.setPoller(this);
-            ka.setReadTimeout(getSocketProperties().getSoTimeout());
-            ka.setWriteTimeout(getSocketProperties().getSoTimeout());
-            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
-            ka.setReadTimeout(getConnectionTimeout());
-            ka.setWriteTimeout(getConnectionTimeout());
-            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
+        public void register(final NioSocketWrapper socketWrapper) {
+            socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
             PollerEvent event = null;
             if (eventCache != null) {
                 event = eventCache.pop();
             }
             if (event == null) {
-                event = new PollerEvent(ka, OP_REGISTER);
+                event = new PollerEvent(socketWrapper, OP_REGISTER);
             } else {
-                event.reset(ka, OP_REGISTER);
+                event.reset(socketWrapper, OP_REGISTER);
             }
             addEvent(event);
         }
 
         public void cancelledKey(SelectionKey sk, SocketWrapperBase<NioChannel> socketWrapper) {
-            try {
-                if (socketWrapper != null) {
-                    socketWrapper.close();
-                }
-                if (sk != null) {
-                    sk.attach(null);
-                    if (sk.isValid()) {
-                        sk.cancel();
+            if (JreCompat.isJre11Available() && socketWrapper != null) {
+                socketWrapper.close();
+            } else {
+                try {
+                    // If is important to cancel the key first, otherwise a deadlock may occur between the
+                    // poller select and the socket channel close which would cancel the key
+                    // This workaround is not needed on Java 11+
+                    if (sk != null) {
+                        sk.attach(null);
+                        if (sk.isValid()) {
+                            sk.cancel();
+                        }
                     }
-                    // The SocketChannel is also available via the SelectionKey. If
-                    // it hasn't been closed in the block above, close it now.
-                    if (sk.channel().isOpen()) {
-                        sk.channel().close();
+                } catch (Throwable e) {
+                    ExceptionUtils.handleThrowable(e);
+                    if (log.isDebugEnabled()) {
+                        log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
+                    }
+                } finally {
+                    if (socketWrapper != null) {
+                        socketWrapper.close();
                     }
-                }
-            } catch (Throwable e) {
-                ExceptionUtils.handleThrowable(e);
-                if (log.isDebugEnabled()) {
-                    log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
                 }
             }
         }
@@ -893,21 +763,39 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
             try {
                 if (close) {
                     cancelledKey(sk, socketWrapper);
-                } else if (sk.isValid() && socketWrapper != null ) {
-                    if (sk.isReadable() || sk.isWritable() ) {
-                        if (socketWrapper.getSendfileData() != null ) {
+                } else if (sk.isValid()) {
+                    if (sk.isReadable() || sk.isWritable()) {
+                        if (socketWrapper.getSendfileData() != null) {
                             processSendfile(sk, socketWrapper, false);
                         } else {
                             unreg(sk, socketWrapper, sk.readyOps());
                             boolean closeSocket = false;
                             // Read goes before write
                             if (sk.isReadable()) {
-                                if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
+                                if (socketWrapper.readOperation != null) {
+                                    if (!socketWrapper.readOperation.process()) {
+                                        closeSocket = true;
+                                    }
+                                } else if (socketWrapper.readBlocking) {
+                                    synchronized (socketWrapper.readLock) {
+                                        socketWrapper.readBlocking = false;
+                                        socketWrapper.readLock.notify();
+                                    }
+                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                                     closeSocket = true;
                                 }
                             }
                             if (!closeSocket && sk.isWritable()) {
-                                if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
+                                if (socketWrapper.writeOperation != null) {
+                                    if (!socketWrapper.writeOperation.process()) {
+                                        closeSocket = true;
+                                    }
+                                } else if (socketWrapper.writeBlocking) {
+                                    synchronized (socketWrapper.writeLock) {
+                                        socketWrapper.writeBlocking = false;
+                                        socketWrapper.writeLock.notify();
+                                    }
+                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
                                     closeSocket = true;
                                 }
                             }
@@ -1074,7 +962,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                             key.interestOps(0);
                             // Avoid duplicate stop calls
                             socketWrapper.interestOps(0);
-                            processKey(key, socketWrapper);
+                            cancelledKey(key, socketWrapper);
                         } else if ((socketWrapper.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ||
                                   (socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
                             boolean readTimeout = false;
@@ -1139,61 +1027,31 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
 
     public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
 
-        private final NioSelectorPool pool;
         private final SynchronizedStack<NioChannel> nioChannels;
+        private final Poller poller;
 
-        private Poller poller = null;
         private int interestOps = 0;
-        private CountDownLatch readLatch = null;
-        private CountDownLatch writeLatch = null;
         private volatile SendfileData sendfileData = null;
         private volatile long lastRead = System.currentTimeMillis();
         private volatile long lastWrite = lastRead;
 
+        private final Object readLock;
+        private volatile boolean readBlocking = false;
+        private final Object writeLock;
+        private volatile boolean writeBlocking = false;
+
         public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
             super(channel, endpoint);
-            pool = endpoint.getSelectorPool();
+            nioChannels = endpoint.getNioChannels();
+            poller = endpoint.getPoller();
             socketBufferHandler = channel.getBufHandler();
-            nioChannels = endpoint.nioChannels;
+            readLock = (readPending == null) ? new Object() : readPending;
+            writeLock = (writePending == null) ? new Object() : writePending;
         }
 
         public Poller getPoller() { return poller; }
-        public void setPoller(Poller poller){this.poller = poller;}
         public int interestOps() { return interestOps; }
         public int interestOps(int ops) { this.interestOps  = ops; return ops; }
-        public CountDownLatch getReadLatch() { return readLatch; }
-        public CountDownLatch getWriteLatch() { return writeLatch; }
-        protected CountDownLatch resetLatch(CountDownLatch latch) {
-            if ( latch==null || latch.getCount() == 0 ) {
-                return null;
-            } else {
-                throw new IllegalStateException("Latch must be at count 0");
-            }
-        }
-        public void resetReadLatch() { readLatch = resetLatch(readLatch); }
-        public void resetWriteLatch() { writeLatch = resetLatch(writeLatch); }
-
-        protected CountDownLatch startLatch(CountDownLatch latch, int cnt) {
-            if ( latch == null || latch.getCount() == 0 ) {
-                return new CountDownLatch(cnt);
-            } else {
-                throw new IllegalStateException("Latch must be at count 0 or null.");
-            }
-        }
-        public void startReadLatch(int cnt) { readLatch = startLatch(readLatch,cnt);}
-        public void startWriteLatch(int cnt) { writeLatch = startLatch(writeLatch,cnt);}
-
-        protected void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException {
-            if ( latch == null ) {
-                throw new IllegalStateException("Latch cannot be null");
-            }
-            // Note: While the return value is ignored if the latch does time
-            //       out, logic further up the call stack will trigger a
-            //       SocketTimeoutException
-            latch.await(timeout,unit);
-        }
-        public void awaitReadLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(readLatch,timeout,unit);}
-        public void awaitWriteLatch(long timeout, TimeUnit unit) throws InterruptedException { awaitLatch(writeLatch,timeout,unit);}
 
         public void setSendfileData(SendfileData sf) { this.sendfileData = sf;}
         public SendfileData getSendfileData() { return this.sendfileData; }
@@ -1294,7 +1152,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                 log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])");
             }
             try {
-                getEndpoint().countDownConnection();
+                getEndpoint().connections.remove(getSocket().getIOChannel());
                 if (getSocket().isOpen()) {
                     getSocket().close(true);
                 }
@@ -1332,70 +1190,141 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
         }
 
 
-        private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
-            int nRead;
-            NioChannel socket = getSocket();
-            if (socket == NioChannel.CLOSED_NIO_CHANNEL) {
+        private int fillReadBuffer(boolean block, ByteBuffer buffer) throws IOException {
+            int n = 0;
+            if (getSocket() == NioChannel.CLOSED_NIO_CHANNEL) {
                 throw new ClosedChannelException();
             }
             if (block) {
-                Selector selector = null;
-                try {
-                    selector = pool.get();
-                } catch (IOException x) {
-                    // Ignore
-                }
-                try {
-                    NioEndpoint.NioSocketWrapper att = (NioEndpoint.NioSocketWrapper) socket
-                            .getAttachment();
-                    if (att == null) {
-                        throw new IOException("Key must be cancelled.");
+                long timeout = getReadTimeout();
+                long startNanos = 0;
+                do {
+                    if (startNanos > 0) {
+                        long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                        if (elapsedMillis == 0) {
+                            elapsedMillis = 1;
+                        }
+                        timeout -= elapsedMillis;
+                        if (timeout <= 0) {
+                            throw new SocketTimeoutException();
+                        }
                     }
-                    nRead = pool.read(to, socket, selector, att.getReadTimeout());
-                } finally {
-                    if (selector != null) {
-                        pool.put(selector);
+                    n = getSocket().read(buffer);
+                    if (n == -1) {
+                        throw new EOFException();
+                    } else if (n == 0) {
+                        if (!readBlocking) {
+                            readBlocking = true;
+                            registerReadInterest();
+                        }
+                        synchronized (readLock) {
+                            if (readBlocking) {
+                                try {
+                                    if (timeout > 0) {
+                                        startNanos = System.nanoTime();
+                                        readLock.wait(timeout);
+                                    } else {
+                                        readLock.wait();
+                                    }
+                                } catch (InterruptedException e) {
+                                    // Continue
+                                }
+                            }
+                        }
                     }
-                }
+                } while (n == 0); // TLS needs to loop as reading zero application bytes is possible
             } else {
-                nRead = socket.read(to);
-                if (nRead == -1) {
+                n = getSocket().read(buffer);
+                if (n == -1) {
                     throw new EOFException();
                 }
             }
-            return nRead;
+            return n;
         }
 
 
         @Override
-        protected void doWrite(boolean block, ByteBuffer from) throws IOException {
-            long writeTimeout = getWriteTimeout();
-            Selector selector = null;
-            try {
-                selector = pool.get();
-            } catch (IOException x) {
-                // Ignore
+        protected void doWrite(boolean block, ByteBuffer buffer) throws IOException {
+            int n = 0;
+            if (getSocket() == NioChannel.CLOSED_NIO_CHANNEL) {
+                throw new ClosedChannelException();
             }
-            try {
-                pool.write(from, getSocket(), selector, writeTimeout, block);
-                if (block) {
-                    // Make sure we are flushed
-                    do {
-                        if (getSocket().flush(true, selector, writeTimeout)) {
-                            break;
-                        }
-                    } while (true);
-                }
-                updateLastWrite();
-            } finally {
-                if (selector != null) {
-                    pool.put(selector);
+            if (block) {
+                if (previousIOException != null) {
+                    /*
+                     * Socket has previously timed out.
+                     *
+                     * Blocking writes assume that buffer is always fully
+                     * written so there is no code checking for incomplete
+                     * writes, retaining the unwritten data and attempting to
+                     * write it as part of a subsequent write call.
+                     *
+                     * Because of the above, when a timeout is triggered we need
+                     * to skip subsequent attempts to write as otherwise it will
+                     * appear to the client as if some data was dropped just
+                     * before the connection is lost. It is better if the client
+                     * just sees the dropped connection.
+                     */
+                    throw new IOException(previousIOException);
                 }
+                long timeout = getWriteTimeout();
+                long startNanos = 0;
+                do {
+                    if (startNanos > 0) {
+                        long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                        if (elapsedMillis == 0) {
+                            elapsedMillis = 1;
+                        }
+                        timeout -= elapsedMillis;
+                        if (timeout <= 0) {
+                            previousIOException = new SocketTimeoutException();
+                            throw previousIOException;
+                        }
+                    }
+                    n = getSocket().write(buffer);
+                    if (n == -1) {
+                        throw new EOFException();
+                    } else if (n == 0 && (buffer.hasRemaining() || getSocket().getOutboundRemaining() > 0)) {
+                        // n == 0 could be an incomplete write but it could also
+                        // indicate that a previous incomplete write of the
+                        // outbound buffer (for TLS) has now completed. Only
+                        // block if there is still data to write.
+                        writeBlocking = true;
+                        registerWriteInterest();
+                        synchronized (writeLock) {
+                            if (writeBlocking) {
+                                try {
+                                    if (timeout > 0) {
+                                        startNanos = System.nanoTime();
+                                        writeLock.wait(timeout);
+                                    } else {
+                                        writeLock.wait();
+                                    }
+                                } catch (InterruptedException e) {
+                                    // Continue
+                                }
+                                writeBlocking = false;
+                            }
+                        }
+                    } else if (startNanos > 0) {
+                        // If something was written, reset timeout
+                        timeout = getWriteTimeout();
+                        startNanos = 0;
+                    }
+                } while (buffer.hasRemaining() || getSocket().getOutboundRemaining() > 0);
+            } else {
+                do {
+                    n = getSocket().write(buffer);
+                    if (n == -1) {
+                        throw new EOFException();
+                    }
+                } while (n > 0 && buffer.hasRemaining());
+                // If there is data left in the buffer the socket will be registered for
+                // write further up the stack. This is to ensure the socket is only
+                // registered for write once as both container and user code can trigger
+                // write registration.
             }
-            // If there is data left in the buffer the socket will be registered for
-            // write further up the stack. This is to ensure the socket is only
-            // registered for write once as both container and user code can trigger
-            // write registration.
+            updateLastWrite();
         }
 
 
@@ -1426,10 +1355,9 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
         @Override
         public SendfileState processSendfile(SendfileDataBase sendfileData) {
             setSendfileData((SendfileData) sendfileData);
-            SelectionKey key = getSocket().getIOChannel().keyFor(
-                    getSocket().getPoller().getSelector());
+            SelectionKey key = getSocket().getIOChannel().keyFor(getPoller().getSelector());
             // Might as well do the first write on this thread
-            return getSocket().getPoller().processSendfile(key, this, true);
+            return getPoller().processSendfile(key, this, true);
         }
 
 
diff --git a/java/org/apache/tomcat/util/net/NioSelectorPool.java b/java/org/apache/tomcat/util/net/NioSelectorPool.java
deleted file mode 100644
index 791f54e..0000000
--- a/java/org/apache/tomcat/util/net/NioSelectorPool.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.util.net;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-
-/**
- *
- * Thread safe non blocking selector pool
- * @version 1.0
- * @since 6.0
- */
-
-public class NioSelectorPool {
-
-    public NioSelectorPool() {
-    }
-
-    private static final Log log = LogFactory.getLog(NioSelectorPool.class);
-
-    protected static final boolean SHARED =
-        Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true"));
-
-    protected NioBlockingSelector blockingSelector;
-
-    protected volatile Selector SHARED_SELECTOR;
-
-    protected int maxSelectors = 200;
-    protected long sharedSelectorTimeout = 30000;
-    protected int maxSpareSelectors = -1;
-    protected boolean enabled = true;
-    protected AtomicInteger active = new AtomicInteger(0);
-    protected AtomicInteger spare = new AtomicInteger(0);
-    protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<>();
-
-    protected Selector getSharedSelector() throws IOException {
-        if (SHARED && SHARED_SELECTOR == null) {
-            synchronized ( NioSelectorPool.class ) {
-                if ( SHARED_SELECTOR == null )  {
-                    SHARED_SELECTOR = Selector.open();
-                    log.info("Using a shared selector for servlet write/read");
-                }
-            }
-        }
-        return  SHARED_SELECTOR;
-    }
-
-    public Selector get() throws IOException{
-        if (SHARED) {
-            return getSharedSelector();
-        }
-        if ((!enabled) || active.incrementAndGet() >= maxSelectors) {
-            if (enabled) {
-                active.decrementAndGet();
-            }
-            return null;
-        }
-        Selector s = null;
-        try {
-            s = selectors.size() > 0 ? selectors.poll() : null;
-            if (s == null) {
-                s = Selector.open();
-            } else {
-                spare.decrementAndGet();
-            }
-        } catch (NoSuchElementException x) {
-            try {
-                s = Selector.open();
-            } catch (IOException iox) {
-            }
-        } finally {
-            if (s == null) {
-                active.decrementAndGet();// we were unable to find a selector
-            }
-        }
-        return s;
-    }
-
-
-
-    public void put(Selector s) throws IOException {
-        if (SHARED) {
-            return;
-        }
-        if (enabled) {
-            active.decrementAndGet();
-        }
-        if (enabled && (maxSpareSelectors == -1 || spare.get() < Math.min(maxSpareSelectors, maxSelectors))) {
-            spare.incrementAndGet();
-            selectors.offer(s);
-        } else {
-            s.close();
-        }
-    }
-
-    public void close() throws IOException {
-        enabled = false;
-        Selector s;
-        while ((s = selectors.poll()) != null) {
-            s.close();
-        }
-        spare.set(0);
-        active.set(0);
-        if (blockingSelector != null) {
-            blockingSelector.close();
-        }
-        if (SHARED && getSharedSelector() != null) {
-            getSharedSelector().close();
-            SHARED_SELECTOR = null;
-        }
-    }
-
-    public void open() throws IOException {
-        enabled = true;
-        getSharedSelector();
-        if (SHARED) {
-            blockingSelector = new NioBlockingSelector();
-            blockingSelector.open(getSharedSelector());
-        }
-
-    }
-
-    /**
-     * Performs a write using the bytebuffer for data to be written and a
-     * selector to block (if blocking is requested). If the
-     * <code>selector</code> parameter is null, and blocking is requested then
-     * it will perform a busy write that could take up a lot of CPU cycles.
-     * @param buf           The buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
-     * @param socket        The socket to write data to
-     * @param selector      The selector to use for blocking, if null then a busy write will be initiated
-     * @param writeTimeout  The timeout for this write operation in milliseconds, -1 means no timeout
-     * @param block         <code>true</code> to perform a blocking write
-     *                      otherwise a non-blocking write will be performed
-     * @return int - returns the number of bytes written
-     * @throws EOFException if write returns -1
-     * @throws SocketTimeoutException if the write times out
-     * @throws IOException if an IO Exception occurs in the underlying socket logic
-     */
-    public int write(ByteBuffer buf, NioChannel socket, Selector selector,
-                     long writeTimeout, boolean block) throws IOException {
-        if ( SHARED && block ) {
-            return blockingSelector.write(buf,socket,writeTimeout);
-        }
-        if (socket.getSocketWrapper().previousIOException != null) {
-            /*
-             * Socket has previously seen an IOException on write.
-             *
-             * Blocking writes assume that buffer is always fully written so
-             * there is no code checking for incomplete writes, retaining
-             * the unwritten data and attempting to write it as part of a
-             * subsequent write call.
-             *
-             * Because of the above, when an IOException is triggered we
-             * need so skip subsequent attempts to write as otherwise it
-             * will appear to the client as if some data was dropped just
-             * before the connection is lost. It is better if the client
-             * just sees the dropped connection.
-             */
-            throw new IOException(socket.getSocketWrapper().previousIOException);
-        }
-        SelectionKey key = null;
-        int written = 0;
-        boolean timedout = false;
-        int keycount = 1; //assume we can write
-        long time = System.currentTimeMillis(); //start the timeout timer
-        try {
-            while ((!timedout) && buf.hasRemaining()) {
-                int cnt = 0;
-                if ( keycount > 0 ) { //only write if we were registered for a write
-                    cnt = socket.write(buf); //write the data
-                    if (cnt == -1) {
-                        throw new EOFException();
-                    }
-
-                    written += cnt;
-                    if (cnt > 0) {
-                        time = System.currentTimeMillis(); //reset our timeout timer
-                        continue; //we successfully wrote, try again without a selector
-                    }
-                    if (cnt==0 && (!block))
-                     {
-                        break; //don't block
-                    }
-                }
-                if (selector != null) {
-                    //register OP_WRITE to the selector
-                    if (key == null) {
-                        key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
-                    } else {
-                        key.interestOps(SelectionKey.OP_WRITE);
-                    }
-                    if (writeTimeout == 0) {
-                        timedout = buf.hasRemaining();
-                    } else if (writeTimeout < 0) {
-                        keycount = selector.select();
-                    } else {
-                        keycount = selector.select(writeTimeout);
-                    }
-                }
-                if (writeTimeout > 0 && (selector == null || keycount == 0)) {
-                    timedout = (System.currentTimeMillis() - time) >= writeTimeout;
-                }
-            }
-            if (timedout) {
-                socket.getSocketWrapper().previousIOException = new SocketTimeoutException();
-                throw socket.getSocketWrapper().previousIOException;
-            }
-        } finally {
-            if (key != null) {
-                key.cancel();
-                if (selector != null)
-                 {
-                    selector.selectNow();//removes the key from this selector
-                }
-            }
-        }
-        return written;
-    }
-
-    /**
-     * Performs a blocking read using the bytebuffer for data to be read and a selector to block.
-     * If the <code>selector</code> parameter is null, then it will perform a busy read that could
-     * take up a lot of CPU cycles.
-     * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
-     * @param socket SocketChannel - the socket to write data to
-     * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated
-     * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
-     * @return int - returns the number of bytes read
-     * @throws EOFException if read returns -1
-     * @throws SocketTimeoutException if the read times out
-     * @throws IOException if an IO Exception occurs in the underlying socket logic
-     */
-    public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout) throws IOException {
-        return read(buf,socket,selector,readTimeout,true);
-    }
-
-    /**
-     * Performs a read using the bytebuffer for data to be read and a selector to register for events should
-     * you have the block=true.
-     * If the <code>selector</code> parameter is null, then it will perform a busy read that could
-     * take up a lot of CPU cycles.
-     * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
-     * @param socket SocketChannel - the socket to write data to
-     * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated
-     * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
-     * @param block - true if you want to block until data becomes available or timeout time has been reached
-     * @return int - returns the number of bytes read
-     * @throws EOFException if read returns -1
-     * @throws SocketTimeoutException if the read times out
-     * @throws IOException if an IO Exception occurs in the underlying socket logic
-     */
-    public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException {
-        if ( SHARED && block ) {
-            return blockingSelector.read(buf,socket,readTimeout);
-        }
-        SelectionKey key = null;
-        int read = 0;
-        boolean timedout = false;
-        int keycount = 1; //assume we can write
-        long time = System.currentTimeMillis(); //start the timeout timer
-        try {
-            while (!timedout) {
-                int cnt = 0;
-                if (keycount > 0) { //only read if we were registered for a read
-                    cnt = socket.read(buf);
-                    if (cnt == -1) {
-                        if (read == 0) {
-                            read = -1;
-                        }
-                        break;
-                    }
-                    read += cnt;
-                    if (cnt > 0)
-                     {
-                        continue; //read some more
-                    }
-                    if (cnt == 0 && (read > 0 || (!block))) {
-                        break; //we are done reading
-                    }
-                }
-                if (selector != null) {//perform a blocking read
-                    //register OP_WRITE to the selector
-                    if (key == null) {
-                        key = socket.getIOChannel().register(selector, SelectionKey.OP_READ);
-                    } else {
-                        key.interestOps(SelectionKey.OP_READ);
-                    }
-                    if (readTimeout == 0) {
-                        timedout = (read == 0);
-                    } else if (readTimeout < 0) {
-                        keycount = selector.select();
-                    } else {
-                        keycount = selector.select(readTimeout);
-                    }
-                }
-                if (readTimeout > 0 && (selector == null || keycount == 0)) {
-                    timedout = (System.currentTimeMillis() - time) >= readTimeout;
-                }
-            }
-            if (timedout) {
-                throw new SocketTimeoutException();
-            }
-        } finally {
-            if (key != null) {
-                key.cancel();
-                if (selector != null) {
-                    selector.selectNow();//removes the key from this selector
-                }
-            }
-        }
-        return read;
-    }
-
-    public void setMaxSelectors(int maxSelectors) {
-        this.maxSelectors = maxSelectors;
-    }
-
-    public void setMaxSpareSelectors(int maxSpareSelectors) {
-        this.maxSpareSelectors = maxSpareSelectors;
-    }
-
-    public void setEnabled(boolean enabled) {
-        this.enabled = enabled;
-    }
-
-    public void setSharedSelectorTimeout(long sharedSelectorTimeout) {
-        this.sharedSelectorTimeout = sharedSelectorTimeout;
-    }
-
-    public int getMaxSelectors() {
-        return maxSelectors;
-    }
-
-    public int getMaxSpareSelectors() {
-        return maxSpareSelectors;
-    }
-
-    public boolean isEnabled() {
-        return enabled;
-    }
-
-    public long getSharedSelectorTimeout() {
-        return sharedSelectorTimeout;
-    }
-
-    public ConcurrentLinkedQueue<Selector> getSelectors() {
-        return selectors;
-    }
-
-    public AtomicInteger getSpare() {
-        return spare;
-    }
-}
diff --git a/java/org/apache/tomcat/util/net/SecureNio2Channel.java b/java/org/apache/tomcat/util/net/SecureNio2Channel.java
index 0ebc702..7eeee7b 100644
--- a/java/org/apache/tomcat/util/net/SecureNio2Channel.java
+++ b/java/org/apache/tomcat/util/net/SecureNio2Channel.java
@@ -248,10 +248,10 @@ public class SecureNio2Channel extends Nio2Channel  {
                 case FINISHED: {
                     if (endpoint.hasNegotiableProtocols()) {
                         if (sslEngine instanceof SSLUtil.ProtocolInfo) {
-                            socket.setNegotiatedProtocol(
+                            socketWrapper.setNegotiatedProtocol(
                                     ((SSLUtil.ProtocolInfo) sslEngine).getNegotiatedProtocol());
                         } else if (JreCompat.isAlpnSupported()) {
-                            socket.setNegotiatedProtocol(
+                            socketWrapper.setNegotiatedProtocol(
                                     JreCompat.getInstance().getApplicationProtocol(sslEngine));
                         }
                     }
@@ -263,7 +263,7 @@ public class SecureNio2Channel extends Nio2Channel  {
                     } else {
                         if (async) {
                             sc.write(netOutBuffer, AbstractEndpoint.toTimeout(timeout),
-                                    TimeUnit.MILLISECONDS, socket, handshakeWriteCompletionHandler);
+                                    TimeUnit.MILLISECONDS, socketWrapper, handshakeWriteCompletionHandler);
                         } else {
                             try {
                                 if (timeout > 0) {
@@ -302,7 +302,7 @@ public class SecureNio2Channel extends Nio2Channel  {
                         //should actually return OP_READ if we have NEED_UNWRAP
                         if (async) {
                             sc.write(netOutBuffer, AbstractEndpoint.toTimeout(timeout),
-                                    TimeUnit.MILLISECONDS, socket, handshakeWriteCompletionHandler);
+                                    TimeUnit.MILLISECONDS, socketWrapper, handshakeWriteCompletionHandler);
                         } else {
                             try {
                                 if (timeout > 0) {
@@ -335,7 +335,7 @@ public class SecureNio2Channel extends Nio2Channel  {
                         //read more data
                         if (async) {
                             sc.read(netInBuffer, AbstractEndpoint.toTimeout(timeout),
-                                    TimeUnit.MILLISECONDS, socket, handshakeReadCompletionHandler);
+                                    TimeUnit.MILLISECONDS, socketWrapper, handshakeReadCompletionHandler);
                         } else {
                             try {
                                 int read;
@@ -380,7 +380,7 @@ public class SecureNio2Channel extends Nio2Channel  {
         // SNIExtractor only to discover there is no data to process
         if (netInBuffer.position() == 0) {
             sc.read(netInBuffer, AbstractEndpoint.toTimeout(endpoint.getConnectionTimeout()),
-                    TimeUnit.MILLISECONDS, socket, handshakeReadCompletionHandler);
+                    TimeUnit.MILLISECONDS, socketWrapper, handshakeReadCompletionHandler);
             return 1;
         }
 
@@ -396,7 +396,7 @@ public class SecureNio2Channel extends Nio2Channel  {
 
             netInBuffer = ByteBufferUtils.expand(netInBuffer, newLimit);
             sc.read(netInBuffer, AbstractEndpoint.toTimeout(endpoint.getConnectionTimeout()),
-                    TimeUnit.MILLISECONDS, socket, handshakeReadCompletionHandler);
+                    TimeUnit.MILLISECONDS, socketWrapper, handshakeReadCompletionHandler);
             return 1;
         }
 
@@ -414,7 +414,7 @@ public class SecureNio2Channel extends Nio2Channel  {
             break;
         case NEED_READ:
             sc.read(netInBuffer, AbstractEndpoint.toTimeout(endpoint.getConnectionTimeout()),
-                    TimeUnit.MILLISECONDS, socket, handshakeReadCompletionHandler);
+                    TimeUnit.MILLISECONDS, socketWrapper, handshakeReadCompletionHandler);
             return 1;
         case UNDERFLOW:
             // Unable to buffer enough data to read SNI extension data
@@ -984,7 +984,7 @@ public class SecureNio2Channel extends Nio2Channel  {
                                         getBufHandler().expand(
                                                 sslEngine.getSession().getApplicationBufferSize());
                                         dst2 = getBufHandler().getReadBuffer();
-                                    } else if (dst2 == getAppReadBufHandler().getByteBuffer()) {
+                                    } else if (getAppReadBufHandler() != null && dst2 == getAppReadBufHandler().getByteBuffer()) {
                                         getAppReadBufHandler()
                                                 .expand(sslEngine.getSession().getApplicationBufferSize());
                                         dst2 = getAppReadBufHandler().getByteBuffer();
diff --git a/java/org/apache/tomcat/util/net/SecureNioChannel.java b/java/org/apache/tomcat/util/net/SecureNioChannel.java
index fb3924b8..9ba6e29 100644
--- a/java/org/apache/tomcat/util/net/SecureNioChannel.java
+++ b/java/org/apache/tomcat/util/net/SecureNioChannel.java
@@ -73,10 +73,7 @@ public class SecureNioChannel extends NioChannel {
 
     private final Map<String,List<String>> additionalTlsAttributes = new HashMap<>();
 
-    protected NioSelectorPool pool;
-
-    public SecureNioChannel(SocketChannel channel, SocketBufferHandler bufHandler,
-            NioSelectorPool pool, NioEndpoint endpoint) {
+    public SecureNioChannel(SocketBufferHandler bufHandler, NioEndpoint endpoint) {
         super(bufHandler);
 
         // Create the network buffers (these hold the encrypted data).
@@ -88,8 +85,6 @@ public class SecureNioChannel extends NioChannel {
             netOutBuffer = ByteBuffer.allocate(DEFAULT_NET_BUFFER_SIZE);
         }
 
-        // selector pool for blocking operations
-        this.pool = pool;
         this.endpoint = endpoint;
     }
 
@@ -118,28 +113,6 @@ public class SecureNioChannel extends NioChannel {
 //===========================================================================================
 
     /**
-     * Flush the channel.
-     *
-     * @param block     Should a blocking write be used?
-     * @param s         The selector to use for blocking, if null then a busy
-     *                  write will be initiated
-     * @param timeout   The timeout for this write operation in milliseconds,
-     *                  -1 means no timeout
-     * @return <code>true</code> if the network buffer has been flushed out and
-     *         is empty else <code>false</code>
-     * @throws IOException If an I/O error occurs during the operation
-     */
-    @Override
-    public boolean flush(boolean block, Selector s, long timeout) throws IOException {
-        if (!block) {
-            flush(netOutBuffer);
-        } else {
-            pool.write(netOutBuffer, this, s, timeout, block);
-        }
-        return !netOutBuffer.hasRemaining();
-    }
-
-    /**
      * Flushes the buffer to the network, non blocking
      * @param buf ByteBuffer
      * @return boolean true if the buffer has been emptied out, false otherwise
@@ -587,7 +560,6 @@ public class SecureNioChannel extends NioChannel {
         } finally {
             if (force || closed) {
                 closed = true;
-                sc.socket().close();
                 sc.close();
             }
         }
@@ -817,8 +789,6 @@ public class SecureNioChannel extends NioChannel {
     public int write(ByteBuffer src) throws IOException {
         checkInterruptStatus();
         if (src == this.netOutBuffer) {
-            //we can get here through a recursive call
-            //by using the NioBlockingSelector
             int written = sc.write(src);
             return written;
         } else {
@@ -832,6 +802,11 @@ public class SecureNioChannel extends NioChannel {
                 return 0;
             }
 
+            if (!src.hasRemaining()) {
+                // Nothing left to write
+                return 0;
+            }
+
             // The data buffer is empty, we can reuse the entire buffer.
             netOutBuffer.clear();
 
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index e395fd5..22ee3d6 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -105,6 +105,12 @@ public abstract class SocketWrapperBase<E> {
     protected final Semaphore writePending;
     protected volatile OperationState<?> writeOperation = null;
 
+    /**
+     * The org.apache.coyote.Processor instance currently associated
+     * with the wrapper.
+     */
+    protected Object currentProcessor = null;
+
     public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
         this.socket = socket;
         this.endpoint = endpoint;
@@ -129,6 +135,14 @@ public abstract class SocketWrapperBase<E> {
         return endpoint;
     }
 
+    public Object getCurrentProcessor() {
+        return currentProcessor;
+    }
+
+    public void setCurrentProcessor(Object currentProcessor) {
+        this.currentProcessor = currentProcessor;
+    }
+
     /**
      * Transfers processing to a container thread.
      *
@@ -396,7 +410,17 @@ public abstract class SocketWrapperBase<E> {
      */
     public void close() {
         if (closed.compareAndSet(false, true)) {
-            doClose();
+            try {
+                getEndpoint().getHandler().release(this);
+            } catch (Throwable e) {
+                ExceptionUtils.handleThrowable(e);
+                if (log.isDebugEnabled()) {
+                    log.error(sm.getString("endpoint.debug.handlerRelease"), e);
+                }
+            } finally {
+                getEndpoint().countDownConnection();
+                doClose();
+            }
         }
     }
 
diff --git a/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java b/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
index 81043d8..91242e8 100644
--- a/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
+++ b/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
@@ -20,7 +20,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
+import java.nio.channels.CompletionHandler;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import javax.websocket.SendHandler;
 import javax.websocket.SendResult;
@@ -28,8 +30,8 @@ import javax.websocket.SendResult;
 import org.apache.coyote.http11.upgrade.UpgradeInfo;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
-import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.SocketWrapperBase;
+import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode;
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.websocket.Transformation;
 import org.apache.tomcat.websocket.WsRemoteEndpointImplBase;
@@ -67,18 +69,76 @@ public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase {
 
 
     @Override
-    protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry,
+    protected void doWrite(final SendHandler handler, final long blockingWriteTimeoutExpiry,
             ByteBuffer... buffers) {
-        if (blockingWriteTimeoutExpiry == -1) {
-            this.handler = handler;
-            this.buffers = buffers;
-            // This is definitely the same thread that triggered the write so a
-            // dispatch will be required.
-            onWritePossible(true);
+        if (socketWrapper.hasAsyncIO()) {
+            final boolean block = (blockingWriteTimeoutExpiry != -1);
+            long timeout = -1;
+            if (block) {
+                timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
+                if (timeout <= 0) {
+                    SendResult sr = new SendResult(new SocketTimeoutException());
+                    handler.onResult(sr);
+                    return;
+                }
+            } else {
+                this.handler = handler;
+                timeout = getSendTimeout();
+                if (timeout > 0) {
+                    // Register with timeout thread
+                    timeoutExpiry = timeout + System.currentTimeMillis();
+                    wsWriteTimeout.register(this);
+                }
+            }
+            socketWrapper.write(block ? BlockingMode.BLOCK : BlockingMode.SEMI_BLOCK, timeout,
+                    TimeUnit.MILLISECONDS, null, SocketWrapperBase.COMPLETE_WRITE_WITH_COMPLETION,
+                    new CompletionHandler<Long, Void>() {
+                        @Override
+                        public void completed(Long result, Void attachment) {
+                            if (block) {
+                                long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
+                                if (timeout <= 0) {
+                                    failed(new SocketTimeoutException(), null);
+                                } else {
+                                    handler.onResult(SENDRESULT_OK);
+                                }
+                            } else {
+                                wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this);
+                                clearHandler(null, true);
+                            }
+                        }
+                        @Override
+                        public void failed(Throwable exc, Void attachment) {
+                            if (block) {
+                                SendResult sr = new SendResult(exc);
+                                handler.onResult(sr);
+                            } else {
+                                wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this);
+                                clearHandler(exc, true);
+                                close();
+                            }
+                        }
+                    }, buffers);
         } else {
-            // Blocking
-            try {
-                for (ByteBuffer buffer : buffers) {
+            if (blockingWriteTimeoutExpiry == -1) {
+                this.handler = handler;
+                this.buffers = buffers;
+                // This is definitely the same thread that triggered the write so a
+                // dispatch will be required.
+                onWritePossible(true);
+            } else {
+                // Blocking
+                try {
+                    for (ByteBuffer buffer : buffers) {
+                        long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
+                        if (timeout <= 0) {
+                            SendResult sr = new SendResult(new SocketTimeoutException());
+                            handler.onResult(sr);
+                            return;
+                        }
+                        socketWrapper.setWriteTimeout(timeout);
+                        socketWrapper.write(true, buffer);
+                    }
                     long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
                     if (timeout <= 0) {
                         SendResult sr = new SendResult(new SocketTimeoutException());
@@ -86,20 +146,12 @@ public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase {
                         return;
                     }
                     socketWrapper.setWriteTimeout(timeout);
-                    socketWrapper.write(true, buffer);
-                }
-                long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
-                if (timeout <= 0) {
-                    SendResult sr = new SendResult(new SocketTimeoutException());
+                    socketWrapper.flush(true);
+                    handler.onResult(SENDRESULT_OK);
+                } catch (IOException e) {
+                    SendResult sr = new SendResult(e);
                     handler.onResult(sr);
-                    return;
                 }
-                socketWrapper.setWriteTimeout(timeout);
-                socketWrapper.flush(true);
-                handler.onResult(SENDRESULT_OK);
-            } catch (IOException e) {
-                SendResult sr = new SendResult(e);
-                handler.onResult(sr);
             }
         }
     }
@@ -113,6 +165,7 @@ public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase {
 
 
     public void onWritePossible(boolean useDispatch) {
+        // Note: Unused for async IO
         ByteBuffer[] buffers = this.buffers;
         if (buffers == null) {
             // Servlet 3.1 will call the write listener once even if nothing
@@ -228,11 +281,9 @@ public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase {
         if (sh != null) {
             if (useDispatch) {
                 OnResultRunnable r = new OnResultRunnable(sh, t);
-                AbstractEndpoint<?,?> endpoint = socketWrapper.getEndpoint();
-                Executor containerExecutor = endpoint.getExecutor();
-                if (endpoint.isRunning() && containerExecutor != null) {
-                    containerExecutor.execute(r);
-                } else {
+                try {
+                    socketWrapper.execute(r);
+                } catch (RejectedExecutionException ree) {
                     // Can't use the executor so call the runnable directly.
                     // This may not be strictly specification compliant in all
                     // cases but during shutdown only close messages are going

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org