You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2022/02/18 14:50:10 UTC

[tomcat] 01/06: Back-port of sock wrapper close refactoring

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit 7067ee3941dd9169365a988e13bf3fd9bf952fa7
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Fri Feb 18 13:52:42 2022 +0000

    Back-port of sock wrapper close refactoring
---
 .../apache/coyote/http2/Http2UpgradeHandler.java   |   4 +-
 java/org/apache/tomcat/util/net/AprEndpoint.java   |  68 ++++++--------
 java/org/apache/tomcat/util/net/Nio2Endpoint.java  |  36 +++-----
 java/org/apache/tomcat/util/net/NioChannel.java    |  50 +++++++++-
 java/org/apache/tomcat/util/net/NioEndpoint.java   | 101 ++++++++++++---------
 .../apache/tomcat/util/net/SocketWrapperBase.java  |  31 ++++++-
 6 files changed, 175 insertions(+), 115 deletions(-)

diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index a2c87ed..390463f 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -1202,8 +1202,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
         }
         try {
             socketWrapper.close();
-        } catch (IOException ioe) {
-            log.debug(sm.getString("upgradeHandler.socketCloseFailed"), ioe);
+        } catch (Exception e) {
+            log.debug(sm.getString("upgradeHandler.socketCloseFailed"), e);
         }
     }
 
diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java
index 51de670..5145432 100644
--- a/java/org/apache/tomcat/util/net/AprEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AprEndpoint.java
@@ -574,8 +574,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
                 wl.lock();
                 try {
                     socketWrapper.close();
-                } catch (IOException e) {
-                    // Ignore
                 } finally {
                     wl.unlock();
                 }
@@ -2228,9 +2226,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
         private final ByteBuffer sslOutputBuffer;
 
-        private final Object closedLock = new Object();
-        private volatile boolean closed = false;
-
         // This field should only be used by Poller#run()
         private int pollerFlags = 0;
 
@@ -2344,7 +2339,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
 
         private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
-            if (closed) {
+            if (isClosed()) {
                 throw new IOException(sm.getString("socket.apr.closed", getSocket()));
             }
 
@@ -2441,15 +2436,18 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
 
         @Override
-        public void close() {
-            getEndpoint().getHandler().release(this);
-            synchronized (closedLock) {
-                // APR typically crashes if the same socket is closed twice so
-                // make sure that doesn't happen.
-                if (closed) {
-                    return;
+        protected void doClose() {
+            try {
+                getEndpoint().getHandler().release(this);
+            } catch (Throwable e) {
+                ExceptionUtils.handleThrowable(e);
+                if (log.isDebugEnabled()) {
+                    log.error(sm.getString("endpoint.debug.handlerRelease"), e);
                 }
-                closed = true;
+            }
+            socketBufferHandler = SocketBufferHandler.EMPTY;
+            nonBlockingWriteBuffer.clear();
+            synchronized (closed) {
                 if (sslOutputBuffer != null) {
                     ByteBufferUtils.cleanDirectBuffer(sslOutputBuffer);
                 }
@@ -2462,16 +2460,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
 
         @Override
-        public boolean isClosed() {
-            synchronized (closedLock) {
-                return closed;
-            }
-        }
-
-
-        @Override
         protected void doWrite(boolean block, ByteBuffer from) throws IOException {
-            if (closed) {
+            if (isClosed()) {
                 throw new IOException(sm.getString("socket.apr.closed", getSocket()));
             }
 
@@ -2591,8 +2581,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         @Override
         public void registerReadInterest() {
             // Make sure an already closed socket is not added to the poller
-            synchronized (closedLock) {
-                if (closed) {
+            synchronized (closed) {
+                if (isClosed()) {
                     return;
                 }
                 if (log.isDebugEnabled()) {
@@ -2609,8 +2599,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         @Override
         public void registerWriteInterest() {
             // Make sure an already closed socket is not added to the poller
-            synchronized (closedLock) {
-                if (closed) {
+            synchronized (closed) {
+                if (isClosed()) {
                     return;
                 }
                 if (log.isDebugEnabled()) {
@@ -2637,7 +2627,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
         @Override
         protected void populateRemoteAddr() {
-            if (closed) {
+            if (isClosed()) {
                 return;
             }
             try {
@@ -2652,7 +2642,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
         @Override
         protected void populateRemoteHost() {
-            if (closed) {
+            if (isClosed()) {
                 return;
             }
             try {
@@ -2670,7 +2660,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
         @Override
         protected void populateRemotePort() {
-            if (closed) {
+            if (isClosed()) {
                 return;
             }
             try {
@@ -2686,7 +2676,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
         @Override
         protected void populateLocalName() {
-            if (closed) {
+            if (isClosed()) {
                 return;
             }
             try {
@@ -2701,7 +2691,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
         @Override
         protected void populateLocalAddr() {
-            if (closed) {
+            if (isClosed()) {
                 return;
             }
             try {
@@ -2716,7 +2706,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
 
         @Override
         protected void populateLocalPort() {
-            if (closed) {
+            if (isClosed()) {
                 return;
             }
             try {
@@ -2800,8 +2790,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         }
 
         String getSSLInfoS(int id) {
-            synchronized (closedLock) {
-                if (closed) {
+            synchronized (closed) {
+                if (isClosed()) {
                     return null;
                 }
                 try {
@@ -2813,8 +2803,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         }
 
         int getSSLInfoI(int id) {
-            synchronized (closedLock) {
-                if (closed) {
+            synchronized (closed) {
+                if (isClosed()) {
                     return 0;
                 }
                 try {
@@ -2826,8 +2816,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
         }
 
         byte[] getSSLInfoB(int id) {
-            synchronized (closedLock) {
-                if (closed) {
+            synchronized (closed) {
+                if (isClosed()) {
                     return null;
                 }
                 try {
diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index 6adf5d9..e9c234f 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -487,6 +487,8 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
 
     public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> {
 
+        private final SynchronizedStack<Nio2Channel> nioChannels;
+
         private SendfileData sendfileData = null;
 
         private final CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
@@ -499,7 +501,6 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
         private final Semaphore writePending = new Semaphore(1);
         private boolean writeInterest = false; // Guarded by writeCompletionHandler
         private boolean writeNotify = false;
-        private volatile boolean closed = false;
 
         private CompletionHandler<Integer, SendfileData> sendfileHandler
             = new CompletionHandler<Integer, SendfileData>() {
@@ -584,6 +585,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
 
         public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) {
             super(channel, endpoint);
+            nioChannels = endpoint.nioChannels;
             socketBufferHandler = channel.getBufHandler();
 
             this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
@@ -914,7 +916,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
 
 
         @Override
-        public void close() {
+        protected void doClose() {
             if (log.isDebugEnabled()) {
                 log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])", new Exception());
             }
@@ -928,19 +930,25 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
             }
             try {
                 synchronized (getSocket()) {
-                    if (!closed) {
-                        closed = true;
-                        getEndpoint().countDownConnection();
-                    }
+                    getEndpoint().countDownConnection();
                     if (getSocket().isOpen()) {
                         getSocket().close(true);
                     }
                 }
+                if (getEndpoint().running && !getEndpoint().paused) {
+                    if (nioChannels == null || !nioChannels.push(getSocket())) {
+                        getSocket().free();
+                    }
+                }
             } catch (Throwable e) {
                 ExceptionUtils.handleThrowable(e);
                 if (log.isDebugEnabled()) {
                     log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
                 }
+            } finally {
+                socketBufferHandler = SocketBufferHandler.EMPTY;
+                nonBlockingWriteBuffer.clear();
+                reset(Nio2Channel.CLOSED_NIO2_CHANNEL);
             }
             try {
                 SendfileData data = getSendfileData();
@@ -956,12 +964,6 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
         }
 
         @Override
-        public boolean isClosed() {
-            return closed;
-        }
-
-
-        @Override
         public boolean hasAsyncIO() {
             return getEndpoint().getUseAsyncIO();
         }
@@ -1682,22 +1684,12 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                     if (state == SocketState.CLOSED) {
                         // Close socket and pool
                         socketWrapper.close();
-                        if (running) {
-                            if (nioChannels == null || !nioChannels.push(socketWrapper.getSocket())) {
-                                socketWrapper.getSocket().free();
-                            }
-                        }
                     } else if (state == SocketState.UPGRADING) {
                         launch = true;
                     }
                 } else if (handshake == -1 ) {
                     getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
                     socketWrapper.close();
-                    if (running) {
-                        if (nioChannels == null || !nioChannels.push(socketWrapper.getSocket())) {
-                            socketWrapper.getSocket().free();
-                        }
-                    }
                 }
             } catch (VirtualMachineError vme) {
                 ExceptionUtils.handleThrowable(vme);
diff --git a/java/org/apache/tomcat/util/net/NioChannel.java b/java/org/apache/tomcat/util/net/NioChannel.java
index 8f97802..ff8d329 100644
--- a/java/org/apache/tomcat/util/net/NioChannel.java
+++ b/java/org/apache/tomcat/util/net/NioChannel.java
@@ -102,8 +102,7 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
      */
     @Override
     public void close() throws IOException {
-        getIOChannel().socket().close();
-        getIOChannel().close();
+        sc.close();
     }
 
     /**
@@ -222,8 +221,8 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
         this.poller = poller;
     }
 
-    public void setIOChannel(SocketChannel IOChannel) {
-        this.sc = IOChannel;
+    public void setIOChannel(SocketChannel sc) {
+        this.sc = sc;
     }
 
     @Override
@@ -270,4 +269,47 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
     protected ApplicationBufferHandler getAppReadBufHandler() {
         return appReadBufHandler;
     }
+
+    static final NioChannel CLOSED_NIO_CHANNEL = new ClosedNioChannel();
+    public static class ClosedNioChannel extends NioChannel {
+        public ClosedNioChannel() {
+            super(null, null);
+        }
+        @Override
+        public void close() throws IOException {
+        }
+        @Override
+        public boolean isOpen() {
+            return false;
+        }
+        @Override
+        public void reset() throws IOException {
+        }
+        @Override
+        public void free() {
+        }
+        @Override
+        public int read(ByteBuffer dst) throws IOException {
+            return -1;
+        }
+        @Override
+        public long read(ByteBuffer[] dsts, int offset, int length)
+                throws IOException {
+            return -1L;
+        }
+        @Override
+        public int write(ByteBuffer src) throws IOException {
+            checkInterruptStatus();
+            return -1;
+        }
+        @Override
+        public long write(ByteBuffer[] srcs, int offset, int length)
+                throws IOException {
+            return -1L;
+        }
+        @Override
+        public String toString() {
+            return "Closed NioChannel";
+        }
+    }
 }
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java
index 6f31c0c..165aa2f 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -610,29 +610,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
     }
 
 
-    private void close(NioChannel socket, SelectionKey key) {
-        try {
-            if (socket.getPoller().cancelledKey(key) != null) {
-                // SocketWrapper (attachment) was removed from the
-                // key - recycle the key. This can only happen once
-                // per attempted closure so it is used to determine
-                // whether or not to return the key to the cache.
-                // We do NOT want to do this more than once - see BZ
-                // 57340 / 57943.
-                if (log.isDebugEnabled()) {
-                    log.debug("Socket: [" + socket + "] closed");
-                }
-                if (running) {
-                    if (nioChannels == null || !nioChannels.push(socket)) {
-                        socket.free();
-                    }
-                }
-            }
-        } catch (Exception x) {
-            log.error("",x);
-        }
-    }
-
     // ----------------------------------------------------- Poller Inner Classes
 
     /**
@@ -770,7 +747,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                         // since it won't have been counted down when the socket
                         // closed.
                         socketWrapper.getEndpoint().countDownConnection();
-                        ((NioSocketWrapper) socketWrapper).closed = true;
+                        ((NioSocketWrapper) socketWrapper).closed.set(true);
                     } else {
                         final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();
                         if (attachment != null) {
@@ -876,7 +853,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                 }
                 if (ka != null) {
                     countDownConnection();
-                    ka.closed = true;
+                    ka.closed.set(true);
                 }
             } catch (Throwable e) {
                 ExceptionUtils.handleThrowable(e);
@@ -1053,7 +1030,8 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                             if (log.isDebugEnabled()) {
                                 log.debug("Send file connection is being closed");
                             }
-                            close(sc, sk);
+                            poller.cancelledKey(sk);
+                            socketWrapper.close();
                             break;
                         }
                         case PIPELINED: {
@@ -1061,7 +1039,8 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                                 log.debug("Connection is keep alive, processing pipe-lined data");
                             }
                             if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
-                                close(sc, sk);
+                                poller.cancelledKey(sk);
+                                socketWrapper.close();
                             }
                             break;
                         }
@@ -1091,13 +1070,15 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                     log.debug("Unable to complete sendfile request:", e);
                 }
                 if (!calledByProcessor && sc != null) {
-                    close(sc, sk);
+                    poller.cancelledKey(sk);
+                    socketWrapper.close();
                 }
                 return SendfileState.ERROR;
             } catch (Throwable t) {
                 log.error(sm.getString("endpoint.sendfile.error"), t);
                 if (!calledByProcessor && sc != null) {
-                    close(sc, sk);
+                    poller.cancelledKey(sk);
+                    socketWrapper.close();
                 }
                 return SendfileState.ERROR;
             }
@@ -1204,6 +1185,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
     public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
 
         private final NioSelectorPool pool;
+        private final SynchronizedStack<NioChannel> nioChannels;
 
         private Poller poller = null;
         private int interestOps = 0;
@@ -1212,12 +1194,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
         private volatile SendfileData sendfileData = null;
         private volatile long lastRead = System.currentTimeMillis();
         private volatile long lastWrite = lastRead;
-        private volatile boolean closed = false;
 
         public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
             super(channel, endpoint);
             pool = endpoint.getSelectorPool();
             socketBufferHandler = channel.getBufHandler();
+            nioChannels = endpoint.nioChannels;
         }
 
         public Poller getPoller() { return poller; }
@@ -1352,18 +1334,43 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
 
 
         @Override
-        public void close() throws IOException {
-            getSocket().close();
-            getEndpoint().getHandler().release(this);
-        }
-
-
-        @Override
-        public boolean isClosed() {
-            return closed;
+        protected void doClose() {
+            if (log.isDebugEnabled()) {
+                log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])");
+            }
+            try {
+                getEndpoint().countDownConnection();
+                if (getSocket().isOpen()) {
+                    getSocket().close(true);
+                }
+                if (getEndpoint().running) {
+                    if (nioChannels == null || !nioChannels.push(getSocket())) {
+                        getSocket().free();
+                    }
+                }
+            } catch (Throwable e) {
+                ExceptionUtils.handleThrowable(e);
+                if (log.isDebugEnabled()) {
+                    log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
+                }
+            } finally {
+                socketBufferHandler = SocketBufferHandler.EMPTY;
+                nonBlockingWriteBuffer.clear();
+                reset(NioChannel.CLOSED_NIO_CHANNEL);
+            }
+            try {
+                SendfileData data = getSendfileData();
+                if (data != null && data.fchannel != null && data.fchannel.isOpen()) {
+                    data.fchannel.close();
+                }
+            } catch (Throwable e) {
+                ExceptionUtils.handleThrowable(e);
+                if (log.isDebugEnabled()) {
+                    log.error(sm.getString("endpoint.sendfile.closeError"), e);
+                }
+            }
         }
 
-
         private int fillReadBuffer(boolean block) throws IOException {
             socketBufferHandler.configureReadBufferForWrite();
             return fillReadBuffer(block, socketBufferHandler.getReadBuffer());
@@ -1702,10 +1709,14 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
         protected void doRun() {
             NioChannel socket = socketWrapper.getSocket();
             SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+            Poller poller = NioEndpoint.this.poller;
+            if (poller == null) {
+                socketWrapper.close();
+                return;
+            }
 
             try {
                 int handshake = -1;
-
                 try {
                     if (key != null) {
                         if (socket.isHandshakeComplete()) {
@@ -1746,12 +1757,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                         state = getHandler().process(socketWrapper, event);
                     }
                     if (state == SocketState.CLOSED) {
-                        close(socket, key);
-                    }
+                        poller.cancelledKey(key);
+                        socketWrapper.close();                    }
                 } else if (handshake == -1 ) {
                     getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
-                    close(socket, key);
-                } else if (handshake == SelectionKey.OP_READ){
+                    poller.cancelledKey(key);
+                    socketWrapper.close();                } else if (handshake == SelectionKey.OP_READ){
                     socketWrapper.registerReadInterest();
                 } else if (handshake == SelectionKey.OP_WRITE){
                     socketWrapper.registerWriteInterest();
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index 3db0812..e395fd5 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -41,9 +41,11 @@ public abstract class SocketWrapperBase<E> {
 
     protected static final StringManager sm = StringManager.getManager(SocketWrapperBase.class);
 
-    private final E socket;
+    private E socket;
     private final AbstractEndpoint<E,?> endpoint;
 
+    protected final AtomicBoolean closed = new AtomicBoolean(false);
+
     // Volatile because I/O and setting the timeout values occurs on a different
     // thread to the thread checking the timeout.
     private volatile long readTimeout = -1;
@@ -119,6 +121,10 @@ public abstract class SocketWrapperBase<E> {
         return socket;
     }
 
+    protected void reset(E closedSocket) {
+        socket = closedSocket;
+    }
+
     public AbstractEndpoint<E,?> getEndpoint() {
         return endpoint;
     }
@@ -385,8 +391,27 @@ public abstract class SocketWrapperBase<E> {
     }
 
 
-    public abstract void close() throws IOException;
-    public abstract boolean isClosed();
+    /**
+     * Close the socket wrapper.
+     */
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            doClose();
+        }
+    }
+
+    /**
+     * Perform the actual close. The closed atomic boolean guarantees this will
+     * be called only once per wrapper.
+     */
+    protected abstract void doClose();
+
+    /**
+     * @return true if the wrapper has been closed
+     */
+    public boolean isClosed() {
+        return closed.get();
+    }
 
 
     /**

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org