You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2022/02/18 14:50:10 UTC
[tomcat] 01/06: Back-port of sock wrapper close refactoring
This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit 7067ee3941dd9169365a988e13bf3fd9bf952fa7
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Fri Feb 18 13:52:42 2022 +0000
Back-port of sock wrapper close refactoring
---
.../apache/coyote/http2/Http2UpgradeHandler.java | 4 +-
java/org/apache/tomcat/util/net/AprEndpoint.java | 68 ++++++--------
java/org/apache/tomcat/util/net/Nio2Endpoint.java | 36 +++-----
java/org/apache/tomcat/util/net/NioChannel.java | 50 +++++++++-
java/org/apache/tomcat/util/net/NioEndpoint.java | 101 ++++++++++++---------
.../apache/tomcat/util/net/SocketWrapperBase.java | 31 ++++++-
6 files changed, 175 insertions(+), 115 deletions(-)
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index a2c87ed..390463f 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -1202,8 +1202,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH
}
try {
socketWrapper.close();
- } catch (IOException ioe) {
- log.debug(sm.getString("upgradeHandler.socketCloseFailed"), ioe);
+ } catch (Exception e) {
+ log.debug(sm.getString("upgradeHandler.socketCloseFailed"), e);
}
}
diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java
index 51de670..5145432 100644
--- a/java/org/apache/tomcat/util/net/AprEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AprEndpoint.java
@@ -574,8 +574,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
wl.lock();
try {
socketWrapper.close();
- } catch (IOException e) {
- // Ignore
} finally {
wl.unlock();
}
@@ -2228,9 +2226,6 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
private final ByteBuffer sslOutputBuffer;
- private final Object closedLock = new Object();
- private volatile boolean closed = false;
-
// This field should only be used by Poller#run()
private int pollerFlags = 0;
@@ -2344,7 +2339,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
- if (closed) {
+ if (isClosed()) {
throw new IOException(sm.getString("socket.apr.closed", getSocket()));
}
@@ -2441,15 +2436,18 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
@Override
- public void close() {
- getEndpoint().getHandler().release(this);
- synchronized (closedLock) {
- // APR typically crashes if the same socket is closed twice so
- // make sure that doesn't happen.
- if (closed) {
- return;
+ protected void doClose() {
+ try {
+ getEndpoint().getHandler().release(this);
+ } catch (Throwable e) {
+ ExceptionUtils.handleThrowable(e);
+ if (log.isDebugEnabled()) {
+ log.error(sm.getString("endpoint.debug.handlerRelease"), e);
}
- closed = true;
+ }
+ socketBufferHandler = SocketBufferHandler.EMPTY;
+ nonBlockingWriteBuffer.clear();
+ synchronized (closed) {
if (sslOutputBuffer != null) {
ByteBufferUtils.cleanDirectBuffer(sslOutputBuffer);
}
@@ -2462,16 +2460,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
@Override
- public boolean isClosed() {
- synchronized (closedLock) {
- return closed;
- }
- }
-
-
- @Override
protected void doWrite(boolean block, ByteBuffer from) throws IOException {
- if (closed) {
+ if (isClosed()) {
throw new IOException(sm.getString("socket.apr.closed", getSocket()));
}
@@ -2591,8 +2581,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
@Override
public void registerReadInterest() {
// Make sure an already closed socket is not added to the poller
- synchronized (closedLock) {
- if (closed) {
+ synchronized (closed) {
+ if (isClosed()) {
return;
}
if (log.isDebugEnabled()) {
@@ -2609,8 +2599,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
@Override
public void registerWriteInterest() {
// Make sure an already closed socket is not added to the poller
- synchronized (closedLock) {
- if (closed) {
+ synchronized (closed) {
+ if (isClosed()) {
return;
}
if (log.isDebugEnabled()) {
@@ -2637,7 +2627,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
@Override
protected void populateRemoteAddr() {
- if (closed) {
+ if (isClosed()) {
return;
}
try {
@@ -2652,7 +2642,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
@Override
protected void populateRemoteHost() {
- if (closed) {
+ if (isClosed()) {
return;
}
try {
@@ -2670,7 +2660,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
@Override
protected void populateRemotePort() {
- if (closed) {
+ if (isClosed()) {
return;
}
try {
@@ -2686,7 +2676,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
@Override
protected void populateLocalName() {
- if (closed) {
+ if (isClosed()) {
return;
}
try {
@@ -2701,7 +2691,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
@Override
protected void populateLocalAddr() {
- if (closed) {
+ if (isClosed()) {
return;
}
try {
@@ -2716,7 +2706,7 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
@Override
protected void populateLocalPort() {
- if (closed) {
+ if (isClosed()) {
return;
}
try {
@@ -2800,8 +2790,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
}
String getSSLInfoS(int id) {
- synchronized (closedLock) {
- if (closed) {
+ synchronized (closed) {
+ if (isClosed()) {
return null;
}
try {
@@ -2813,8 +2803,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
}
int getSSLInfoI(int id) {
- synchronized (closedLock) {
- if (closed) {
+ synchronized (closed) {
+ if (isClosed()) {
return 0;
}
try {
@@ -2826,8 +2816,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
}
byte[] getSSLInfoB(int id) {
- synchronized (closedLock) {
- if (closed) {
+ synchronized (closed) {
+ if (isClosed()) {
return null;
}
try {
diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index 6adf5d9..e9c234f 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -487,6 +487,8 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> {
+ private final SynchronizedStack<Nio2Channel> nioChannels;
+
private SendfileData sendfileData = null;
private final CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
@@ -499,7 +501,6 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
private final Semaphore writePending = new Semaphore(1);
private boolean writeInterest = false; // Guarded by writeCompletionHandler
private boolean writeNotify = false;
- private volatile boolean closed = false;
private CompletionHandler<Integer, SendfileData> sendfileHandler
= new CompletionHandler<Integer, SendfileData>() {
@@ -584,6 +585,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) {
super(channel, endpoint);
+ nioChannels = endpoint.nioChannels;
socketBufferHandler = channel.getBufHandler();
this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
@@ -914,7 +916,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
@Override
- public void close() {
+ protected void doClose() {
if (log.isDebugEnabled()) {
log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])", new Exception());
}
@@ -928,19 +930,25 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
}
try {
synchronized (getSocket()) {
- if (!closed) {
- closed = true;
- getEndpoint().countDownConnection();
- }
+ getEndpoint().countDownConnection();
if (getSocket().isOpen()) {
getSocket().close(true);
}
}
+ if (getEndpoint().running && !getEndpoint().paused) {
+ if (nioChannels == null || !nioChannels.push(getSocket())) {
+ getSocket().free();
+ }
+ }
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
if (log.isDebugEnabled()) {
log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
}
+ } finally {
+ socketBufferHandler = SocketBufferHandler.EMPTY;
+ nonBlockingWriteBuffer.clear();
+ reset(Nio2Channel.CLOSED_NIO2_CHANNEL);
}
try {
SendfileData data = getSendfileData();
@@ -956,12 +964,6 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
}
@Override
- public boolean isClosed() {
- return closed;
- }
-
-
- @Override
public boolean hasAsyncIO() {
return getEndpoint().getUseAsyncIO();
}
@@ -1682,22 +1684,12 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
if (state == SocketState.CLOSED) {
// Close socket and pool
socketWrapper.close();
- if (running) {
- if (nioChannels == null || !nioChannels.push(socketWrapper.getSocket())) {
- socketWrapper.getSocket().free();
- }
- }
} else if (state == SocketState.UPGRADING) {
launch = true;
}
} else if (handshake == -1 ) {
getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
socketWrapper.close();
- if (running) {
- if (nioChannels == null || !nioChannels.push(socketWrapper.getSocket())) {
- socketWrapper.getSocket().free();
- }
- }
}
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
diff --git a/java/org/apache/tomcat/util/net/NioChannel.java b/java/org/apache/tomcat/util/net/NioChannel.java
index 8f97802..ff8d329 100644
--- a/java/org/apache/tomcat/util/net/NioChannel.java
+++ b/java/org/apache/tomcat/util/net/NioChannel.java
@@ -102,8 +102,7 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
*/
@Override
public void close() throws IOException {
- getIOChannel().socket().close();
- getIOChannel().close();
+ sc.close();
}
/**
@@ -222,8 +221,8 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
this.poller = poller;
}
- public void setIOChannel(SocketChannel IOChannel) {
- this.sc = IOChannel;
+ public void setIOChannel(SocketChannel sc) {
+ this.sc = sc;
}
@Override
@@ -270,4 +269,47 @@ public class NioChannel implements ByteChannel, ScatteringByteChannel, Gathering
protected ApplicationBufferHandler getAppReadBufHandler() {
return appReadBufHandler;
}
+
+ static final NioChannel CLOSED_NIO_CHANNEL = new ClosedNioChannel();
+ public static class ClosedNioChannel extends NioChannel {
+ public ClosedNioChannel() {
+ super(null, null);
+ }
+ @Override
+ public void close() throws IOException {
+ }
+ @Override
+ public boolean isOpen() {
+ return false;
+ }
+ @Override
+ public void reset() throws IOException {
+ }
+ @Override
+ public void free() {
+ }
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ return -1;
+ }
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length)
+ throws IOException {
+ return -1L;
+ }
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ checkInterruptStatus();
+ return -1;
+ }
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length)
+ throws IOException {
+ return -1L;
+ }
+ @Override
+ public String toString() {
+ return "Closed NioChannel";
+ }
+ }
}
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java
index 6f31c0c..165aa2f 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -610,29 +610,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
}
- private void close(NioChannel socket, SelectionKey key) {
- try {
- if (socket.getPoller().cancelledKey(key) != null) {
- // SocketWrapper (attachment) was removed from the
- // key - recycle the key. This can only happen once
- // per attempted closure so it is used to determine
- // whether or not to return the key to the cache.
- // We do NOT want to do this more than once - see BZ
- // 57340 / 57943.
- if (log.isDebugEnabled()) {
- log.debug("Socket: [" + socket + "] closed");
- }
- if (running) {
- if (nioChannels == null || !nioChannels.push(socket)) {
- socket.free();
- }
- }
- }
- } catch (Exception x) {
- log.error("",x);
- }
- }
-
// ----------------------------------------------------- Poller Inner Classes
/**
@@ -770,7 +747,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
// since it won't have been counted down when the socket
// closed.
socketWrapper.getEndpoint().countDownConnection();
- ((NioSocketWrapper) socketWrapper).closed = true;
+ ((NioSocketWrapper) socketWrapper).closed.set(true);
} else {
final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();
if (attachment != null) {
@@ -876,7 +853,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
}
if (ka != null) {
countDownConnection();
- ka.closed = true;
+ ka.closed.set(true);
}
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
@@ -1053,7 +1030,8 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
if (log.isDebugEnabled()) {
log.debug("Send file connection is being closed");
}
- close(sc, sk);
+ poller.cancelledKey(sk);
+ socketWrapper.close();
break;
}
case PIPELINED: {
@@ -1061,7 +1039,8 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
log.debug("Connection is keep alive, processing pipe-lined data");
}
if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
- close(sc, sk);
+ poller.cancelledKey(sk);
+ socketWrapper.close();
}
break;
}
@@ -1091,13 +1070,15 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
log.debug("Unable to complete sendfile request:", e);
}
if (!calledByProcessor && sc != null) {
- close(sc, sk);
+ poller.cancelledKey(sk);
+ socketWrapper.close();
}
return SendfileState.ERROR;
} catch (Throwable t) {
log.error(sm.getString("endpoint.sendfile.error"), t);
if (!calledByProcessor && sc != null) {
- close(sc, sk);
+ poller.cancelledKey(sk);
+ socketWrapper.close();
}
return SendfileState.ERROR;
}
@@ -1204,6 +1185,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
private final NioSelectorPool pool;
+ private final SynchronizedStack<NioChannel> nioChannels;
private Poller poller = null;
private int interestOps = 0;
@@ -1212,12 +1194,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
private volatile SendfileData sendfileData = null;
private volatile long lastRead = System.currentTimeMillis();
private volatile long lastWrite = lastRead;
- private volatile boolean closed = false;
public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
super(channel, endpoint);
pool = endpoint.getSelectorPool();
socketBufferHandler = channel.getBufHandler();
+ nioChannels = endpoint.nioChannels;
}
public Poller getPoller() { return poller; }
@@ -1352,18 +1334,43 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
@Override
- public void close() throws IOException {
- getSocket().close();
- getEndpoint().getHandler().release(this);
- }
-
-
- @Override
- public boolean isClosed() {
- return closed;
+ protected void doClose() {
+ if (log.isDebugEnabled()) {
+ log.debug("Calling [" + getEndpoint() + "].closeSocket([" + this + "])");
+ }
+ try {
+ getEndpoint().countDownConnection();
+ if (getSocket().isOpen()) {
+ getSocket().close(true);
+ }
+ if (getEndpoint().running) {
+ if (nioChannels == null || !nioChannels.push(getSocket())) {
+ getSocket().free();
+ }
+ }
+ } catch (Throwable e) {
+ ExceptionUtils.handleThrowable(e);
+ if (log.isDebugEnabled()) {
+ log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
+ }
+ } finally {
+ socketBufferHandler = SocketBufferHandler.EMPTY;
+ nonBlockingWriteBuffer.clear();
+ reset(NioChannel.CLOSED_NIO_CHANNEL);
+ }
+ try {
+ SendfileData data = getSendfileData();
+ if (data != null && data.fchannel != null && data.fchannel.isOpen()) {
+ data.fchannel.close();
+ }
+ } catch (Throwable e) {
+ ExceptionUtils.handleThrowable(e);
+ if (log.isDebugEnabled()) {
+ log.error(sm.getString("endpoint.sendfile.closeError"), e);
+ }
+ }
}
-
private int fillReadBuffer(boolean block) throws IOException {
socketBufferHandler.configureReadBufferForWrite();
return fillReadBuffer(block, socketBufferHandler.getReadBuffer());
@@ -1702,10 +1709,14 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ Poller poller = NioEndpoint.this.poller;
+ if (poller == null) {
+ socketWrapper.close();
+ return;
+ }
try {
int handshake = -1;
-
try {
if (key != null) {
if (socket.isHandshakeComplete()) {
@@ -1746,12 +1757,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
- close(socket, key);
- }
+ poller.cancelledKey(key);
+ socketWrapper.close(); }
} else if (handshake == -1 ) {
getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
- close(socket, key);
- } else if (handshake == SelectionKey.OP_READ){
+ poller.cancelledKey(key);
+ socketWrapper.close(); } else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index 3db0812..e395fd5 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -41,9 +41,11 @@ public abstract class SocketWrapperBase<E> {
protected static final StringManager sm = StringManager.getManager(SocketWrapperBase.class);
- private final E socket;
+ private E socket;
private final AbstractEndpoint<E,?> endpoint;
+ protected final AtomicBoolean closed = new AtomicBoolean(false);
+
// Volatile because I/O and setting the timeout values occurs on a different
// thread to the thread checking the timeout.
private volatile long readTimeout = -1;
@@ -119,6 +121,10 @@ public abstract class SocketWrapperBase<E> {
return socket;
}
+ protected void reset(E closedSocket) {
+ socket = closedSocket;
+ }
+
public AbstractEndpoint<E,?> getEndpoint() {
return endpoint;
}
@@ -385,8 +391,27 @@ public abstract class SocketWrapperBase<E> {
}
- public abstract void close() throws IOException;
- public abstract boolean isClosed();
+ /**
+ * Close the socket wrapper.
+ */
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ doClose();
+ }
+ }
+
+ /**
+ * Perform the actual close. The closed atomic boolean guarantees this will
+ * be called only once per wrapper.
+ */
+ protected abstract void doClose();
+
+ /**
+ * @return true if the wrapper has been closed
+ */
+ public boolean isClosed() {
+ return closed.get();
+ }
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org