You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2019/05/14 16:00:44 UTC
[tomcat] branch master updated: Refactor async IO implementation to
SocketWrapperBase
This is an automated email from the ASF dual-hosted git repository.
remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push:
new 9d6388a Refactor async IO implementation to SocketWrapperBase
9d6388a is described below
commit 9d6388affba4e9140c50e8ca8938569f8709d008
Author: remm <re...@apache.org>
AuthorDate: Tue May 14 18:00:32 2019 +0200
Refactor async IO implementation to SocketWrapperBase
Remove all duplicate code I could find, although it is likely there will
be further tweaks needed.
---
.../catalina/security/SecurityClassLoad.java | 10 +-
java/org/apache/tomcat/util/net/AprEndpoint.java | 250 ++---------------
java/org/apache/tomcat/util/net/Nio2Endpoint.java | 224 ++-------------
java/org/apache/tomcat/util/net/NioEndpoint.java | 245 ++---------------
.../apache/tomcat/util/net/SocketWrapperBase.java | 301 ++++++++++++++++++++-
webapps/docs/changelog.xml | 4 +
6 files changed, 379 insertions(+), 655 deletions(-)
diff --git a/java/org/apache/catalina/security/SecurityClassLoad.java b/java/org/apache/catalina/security/SecurityClassLoad.java
index 05272f4..2a44caf 100644
--- a/java/org/apache/catalina/security/SecurityClassLoad.java
+++ b/java/org/apache/catalina/security/SecurityClassLoad.java
@@ -190,16 +190,14 @@ public final class SecurityClassLoad {
loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableAdd");
loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableCancel");
loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableRemove");
- loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$OperationState");
- loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$VectoredIOCompletionHandler");
- loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$OperationState");
- loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$VectoredIOCompletionHandler");
- loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$OperationState");
- loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$VectoredIOCompletionHandler");
+ loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$AprOperationState");
+ loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$NioOperationState");
+ loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$Nio2OperationState");
loader.loadClass(basePackage + "util.net.SocketWrapperBase$BlockingMode");
loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionCheck");
loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionHandlerCall");
loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionState");
+ loader.loadClass(basePackage + "util.net.SocketWrapperBase$VectoredIOCompletionHandler");
// security
loader.loadClass(basePackage + "util.security.PrivilegedGetTccl");
loader.loadClass(basePackage + "util.security.PrivilegedSetTccl");
diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java
index ad8e16d..d5898fe 100644
--- a/java/org/apache/tomcat/util/net/AprEndpoint.java
+++ b/java/org/apache/tomcat/util/net/AprEndpoint.java
@@ -22,9 +22,6 @@ import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
-import java.nio.channels.InterruptedByTimeoutException;
-import java.nio.channels.ReadPendingException;
-import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
@@ -2159,22 +2156,9 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
// This field should only be used by Poller#run()
private int pollerFlags = 0;
- private final Semaphore readPending;
- private OperationState<?> readOperation = null;
- private final Semaphore writePending;
- private OperationState<?> writeOperation = null;
-
public AprSocketWrapper(Long socket, AprEndpoint endpoint) {
super(socket, endpoint);
- if (endpoint.getUseAsyncIO()) {
- readPending = new Semaphore(1);
- writePending = new Semaphore(1);
- } else {
- readPending = null;
- writePending = null;
- }
-
// TODO Make the socketWriteBuffer size configurable and align the
// SSL and app buffer size settings with NIO & NIO2.
if (endpoint.isSSLEnabled()) {
@@ -2779,59 +2763,32 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
}
@Override
- public boolean hasAsyncIO() {
- // The semaphores are only created if async IO is enabled
- return (readPending != null);
+ protected <A> OperationState<A> newOperationState(boolean read,
+ ByteBuffer[] buffers, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+ Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
+ return new AprOperationState<A>(read, buffers, offset, length, block,
+ timeout, unit, attachment, check, handler, semaphore, completion);
}
- /**
- * Internal state tracker for scatter/gather operations.
- */
- private class OperationState<A> implements Runnable {
- private final boolean read;
- private final ByteBuffer[] buffers;
- private final int offset;
- private final int length;
- private final A attachment;
- private final BlockingMode block;
- private final CompletionCheck check;
- private final CompletionHandler<Long, ? super A> handler;
- private final Semaphore semaphore;
- private final VectoredIOCompletionHandler<A> completion;
- private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
- BlockingMode block, A attachment, CompletionCheck check,
+ private class AprOperationState<A> extends OperationState<A> {
+ private AprOperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check,
CompletionHandler<Long, ? super A> handler, Semaphore semaphore,
VectoredIOCompletionHandler<A> completion) {
- this.read = read;
- this.buffers = buffers;
- this.offset = offset;
- this.length = length;
- this.block = block;
- this.attachment = attachment;
- this.check = check;
- this.handler = handler;
- this.semaphore = semaphore;
- this.completion = completion;
- }
- private volatile boolean inline = true;
- private volatile long nBytes = 0;
- private volatile CompletionState state = CompletionState.PENDING;
- private boolean completionDone = true;
-
- public boolean process() {
- try {
- getEndpoint().getExecutor().execute(this);
- } catch (RejectedExecutionException ree) {
- log.warn(sm.getString("endpoint.executor.fail", AprSocketWrapper.this) , ree);
- return false;
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- log.error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
+ super(read, buffers, offset, length, block,
+ timeout, unit, attachment, check, handler, semaphore, completion);
+ }
+
+ @Override
+ protected boolean isInline() {
+ return inline;
+ }
+
+ @Override
+ protected void start() {
+ run();
}
@Override
@@ -2914,168 +2871,5 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB
}
- @Override
- public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
- BlockingMode block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
- }
-
- @Override
- public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
- BlockingMode block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
- }
-
- private <A> CompletionState readOrWrite(boolean read,
- ByteBuffer[] buffers, int offset, int length,
- BlockingMode block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- IOException ioe = getError();
- if (ioe != null) {
- handler.failed(ioe, attachment);
- return CompletionState.ERROR;
- }
- if (timeout == -1) {
- timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
- } else if (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout())) {
- if (read) {
- setReadTimeout(unit.toMillis(timeout));
- } else {
- setWriteTimeout(unit.toMillis(timeout));
- }
- }
- if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
- try {
- if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
- handler.failed(new SocketTimeoutException(), attachment);
- return CompletionState.ERROR;
- }
- } catch (InterruptedException e) {
- handler.failed(e, attachment);
- return CompletionState.ERROR;
- }
- } else {
- if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
- if (block == BlockingMode.NON_BLOCK) {
- return CompletionState.NOT_DONE;
- } else {
- handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
- return CompletionState.ERROR;
- }
- }
- }
- VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
- OperationState<A> state = new OperationState<>(read, buffers, offset, length, block,
- attachment, check, handler, read ? readPending : writePending, completion);
- if (read) {
- readOperation = state;
- } else {
- writeOperation = state;
- }
- state.run();
- if (block == BlockingMode.BLOCK) {
- synchronized (state) {
- if (state.state == CompletionState.PENDING) {
- try {
- state.wait(unit.toMillis(timeout));
- if (state.state == CompletionState.PENDING) {
- return CompletionState.ERROR;
- }
- } catch (InterruptedException e) {
- handler.failed(new SocketTimeoutException(), attachment);
- return CompletionState.ERROR;
- }
- }
- }
- }
- return state.state;
- }
-
- private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
- @Override
- public void completed(Long nBytes, OperationState<A> state) {
- if (nBytes.longValue() < 0) {
- failed(new EOFException(), state);
- } else {
- state.nBytes += nBytes.longValue();
- CompletionState currentState = state.inline ? CompletionState.INLINE : CompletionState.DONE;
- boolean complete = true;
- boolean completion = true;
- if (state.check != null) {
- CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length);
- if (call == CompletionHandlerCall.CONTINUE) {
- complete = false;
- } else if (call == CompletionHandlerCall.NONE) {
- completion = false;
- }
- }
- if (complete) {
- boolean notify = false;
- state.semaphore.release();
- if (state.read) {
- readOperation = null;
- } else {
- writeOperation = null;
- }
- if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
- notify = true;
- } else {
- state.state = currentState;
- }
- if (completion && state.handler != null) {
- state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
- }
- synchronized (state) {
- state.completionDone = true;
- if (notify) {
- state.state = currentState;
- state.notify();
- }
- }
- } else {
- synchronized (state) {
- state.completionDone = true;
- }
- state.run();
- }
- }
- }
- @Override
- public void failed(Throwable exc, OperationState<A> state) {
- IOException ioe = null;
- if (exc instanceof InterruptedByTimeoutException) {
- ioe = new SocketTimeoutException();
- exc = ioe;
- } else if (exc instanceof IOException) {
- ioe = (IOException) exc;
- }
- setError(ioe);
- boolean notify = false;
- state.semaphore.release();
- if (state.read) {
- readOperation = null;
- } else {
- writeOperation = null;
- }
- if (state.block == BlockingMode.BLOCK) {
- notify = true;
- } else {
- state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE;
- }
- if (state.handler != null) {
- state.handler.failed(exc, state.attachment);
- }
- synchronized (state) {
- state.completionDone = true;
- if (notify) {
- state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE;
- state.notify();
- }
- }
- }
- }
-
}
}
diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index 8f2de8d..3c166b9 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -29,15 +29,11 @@ import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
-import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.NetworkChannel;
-import java.nio.channels.ReadPendingException;
-import java.nio.channels.WritePendingException;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -466,13 +462,11 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
private SendfileData sendfileData = null;
private final CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
- private final Semaphore readPending = new Semaphore(1);
private boolean readInterest = false; // Guarded by readCompletionHandler
private boolean readNotify = false;
private final CompletionHandler<Integer, ByteBuffer> writeCompletionHandler;
private final CompletionHandler<Long, ByteBuffer[]> gatheringWriteCompletionHandler;
- private final Semaphore writePending = new Semaphore(1);
private boolean writeInterest = false; // Guarded by writeCompletionHandler
private boolean writeNotify = false;
private volatile boolean closed = false;
@@ -951,63 +945,42 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
return getEndpoint().getUseAsyncIO();
}
- /**
- * Internal state tracker for scatter/gather operations.
- */
- protected class OperationState<A> implements Runnable {
- protected final boolean read;
- protected final ByteBuffer[] buffers;
- protected final int offset;
- protected final int length;
- protected final A attachment;
- protected final long timeout;
- protected final TimeUnit unit;
- protected final BlockingMode block;
- protected final CompletionCheck check;
- protected final CompletionHandler<Long, ? super A> handler;
- protected final Semaphore semaphore;
- protected final VectoredIOCompletionHandler<A> completion;
- protected OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+ @Override
+ public boolean needSemaphores() {
+ return true;
+ }
+
+ @Override
+ public boolean hasPerOperationTimeout() {
+ return false;
+ }
+
+ @Override
+ protected <A> OperationState<A> newOperationState(boolean read,
+ ByteBuffer[] buffers, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+ Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
+ return new Nio2OperationState<A>(read, buffers, offset, length, block,
+ timeout, unit, attachment, check, handler, semaphore, completion);
+ }
+
+ private class Nio2OperationState<A> extends OperationState<A> {
+ private Nio2OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A> handler,
Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
- this.read = read;
- this.buffers = buffers;
- this.offset = offset;
- this.length = length;
- this.block = block;
- this.timeout = timeout;
- this.unit = unit;
- this.attachment = attachment;
- this.check = check;
- this.handler = handler;
- this.semaphore = semaphore;
- this.completion = completion;
- }
- protected volatile long nBytes = 0;
- protected volatile CompletionState state = CompletionState.PENDING;
-
- public boolean isInline() {
- return Nio2Endpoint.isInline();
+ super(read, buffers, offset, length, block,
+ timeout, unit, attachment, check, handler, semaphore, completion);
}
- public boolean process() {
- try {
- getEndpoint().getExecutor().execute(this);
- } catch (RejectedExecutionException ree) {
- log.warn(sm.getString("endpoint.executor.fail", Nio2SocketWrapper.this) , ree);
- return false;
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- log.error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
+ @Override
+ protected boolean isInline() {
+ return Nio2Endpoint.isInline();
}
- public void start() {
+ @Override
+ protected void start() {
if (read) {
// Disable any regular read notifications caused by registerReadInterest
readNotify = true;
@@ -1051,7 +1024,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
}
@Override
public void failed(Throwable exc, Void attachment) {
- handler.failed(exc, OperationState.this.attachment);
+ handler.failed(exc, Nio2OperationState.this.attachment);
}
});
return;
@@ -1062,143 +1035,6 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
}
}
- @Override
- public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
- BlockingMode block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
- }
-
- @Override
- public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
- BlockingMode block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
- }
-
- private <A> CompletionState readOrWrite(boolean read,
- ByteBuffer[] buffers, int offset, int length,
- BlockingMode block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- IOException ioe = getError();
- if (ioe != null) {
- handler.failed(ioe, attachment);
- return CompletionState.ERROR;
- }
- if (timeout == -1) {
- timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
- }
- if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
- try {
- if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
- handler.failed(new SocketTimeoutException(), attachment);
- return CompletionState.ERROR;
- }
- } catch (InterruptedException e) {
- handler.failed(e, attachment);
- return CompletionState.ERROR;
- }
- } else {
- if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
- if (block == BlockingMode.NON_BLOCK) {
- return CompletionState.NOT_DONE;
- } else {
- handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
- return CompletionState.ERROR;
- }
- }
- }
- VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
- OperationState<A> state = new OperationState<>(read, buffers, offset, length, block, timeout, unit,
- attachment, check, handler, read ? readPending : writePending, completion);
- state.start();
- if (block == BlockingMode.BLOCK) {
- synchronized (state) {
- if (state.state == CompletionState.PENDING) {
- try {
- state.wait(unit.toMillis(timeout));
- if (state.state == CompletionState.PENDING) {
- return CompletionState.ERROR;
- }
- } catch (InterruptedException e) {
- handler.failed(new SocketTimeoutException(), attachment);
- return CompletionState.ERROR;
- }
- }
- }
- }
- return state.state;
- }
-
- private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
- @Override
- public void completed(Long nBytes, OperationState<A> state) {
- if (nBytes.longValue() < 0) {
- failed(new EOFException(), state);
- } else {
- state.nBytes += nBytes.longValue();
- CompletionState currentState = state.isInline() ? CompletionState.INLINE : CompletionState.DONE;
- boolean complete = true;
- boolean completion = true;
- if (state.check != null) {
- CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length);
- if (call == CompletionHandlerCall.CONTINUE) {
- complete = false;
- } else if (call == CompletionHandlerCall.NONE) {
- completion = false;
- }
- }
- if (complete) {
- boolean notify = false;
- state.semaphore.release();
- if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
- notify = true;
- } else {
- state.state = currentState;
- }
- if (completion && state.handler != null) {
- state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
- }
- if (notify) {
- synchronized (state) {
- state.state = currentState;
- state.notify();
- }
- }
- } else {
- state.run();
- }
- }
- }
- @Override
- public void failed(Throwable exc, OperationState<A> state) {
- IOException ioe = null;
- if (exc instanceof InterruptedByTimeoutException) {
- ioe = new SocketTimeoutException();
- exc = ioe;
- } else if (exc instanceof IOException) {
- ioe = (IOException) exc;
- }
- setError(ioe);
- boolean notify = false;
- state.semaphore.release();
- if (state.block == BlockingMode.BLOCK) {
- notify = true;
- } else {
- state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
- }
- if (state.handler != null) {
- state.handler.failed(exc, state.attachment);
- }
- if (notify) {
- synchronized (state) {
- state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
- state.notify();
- }
- }
- }
- }
-
/* Callers of this method must:
* - have acquired the readPending semaphore
* - have acquired a lock on readCompletionHandler
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java
index a6f0f62..bdebc73 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -29,19 +29,15 @@ import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
-import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.NetworkChannel;
-import java.nio.channels.ReadPendingException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
-import java.nio.channels.WritePendingException;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -1045,10 +1041,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
private int interestOps = 0;
private CountDownLatch readLatch = null;
private CountDownLatch writeLatch = null;
- private final Semaphore readPending;
- private OperationState<?> readOperation = null;
- private final Semaphore writePending;
- private OperationState<?> writeOperation = null;
private volatile SendfileData sendfileData = null;
private volatile long lastRead = System.currentTimeMillis();
private volatile long lastWrite = lastRead;
@@ -1056,13 +1048,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
super(channel, endpoint);
- if (endpoint.getUseAsyncIO()) {
- readPending = new Semaphore(1);
- writePending = new Semaphore(1);
- } else {
- readPending = null;
- writePending = null;
- }
pool = endpoint.getSelectorPool();
socketBufferHandler = channel.getBufHandler();
}
@@ -1431,59 +1416,32 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
}
@Override
- public boolean hasAsyncIO() {
- // The semaphores are only created if async IO is enabled
- return (readPending != null);
+ protected <A> OperationState<A> newOperationState(boolean read,
+ ByteBuffer[] buffers, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+ Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
+ return new NioOperationState<A>(read, buffers, offset, length, block,
+ timeout, unit, attachment, check, handler, semaphore, completion);
}
- /**
- * Internal state tracker for scatter/gather operations.
- */
- private class OperationState<A> implements Runnable {
- private final boolean read;
- private final ByteBuffer[] buffers;
- private final int offset;
- private final int length;
- private final A attachment;
- private final BlockingMode block;
- private final CompletionCheck check;
- private final CompletionHandler<Long, ? super A> handler;
- private final Semaphore semaphore;
- private final VectoredIOCompletionHandler<A> completion;
- private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
- BlockingMode block, A attachment, CompletionCheck check,
+ private class NioOperationState<A> extends OperationState<A> {
+ private NioOperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check,
CompletionHandler<Long, ? super A> handler, Semaphore semaphore,
VectoredIOCompletionHandler<A> completion) {
- this.read = read;
- this.buffers = buffers;
- this.offset = offset;
- this.length = length;
- this.block = block;
- this.attachment = attachment;
- this.check = check;
- this.handler = handler;
- this.semaphore = semaphore;
- this.completion = completion;
+ super(read, buffers, offset, length, block,
+ timeout, unit, attachment, check, handler, semaphore, completion);
}
- private volatile boolean inline = true;
- private volatile long nBytes = 0;
- private volatile CompletionState state = CompletionState.PENDING;
- private boolean completionDone = true;
- public boolean process() {
- try {
- getEndpoint().getExecutor().execute(this);
- } catch (RejectedExecutionException ree) {
- log.warn(sm.getString("endpoint.executor.fail", NioSocketWrapper.this) , ree);
- return false;
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- log.error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
+ @Override
+ protected boolean isInline() {
+ return inline;
+ }
+
+ @Override
+ protected void start() {
+ run();
}
@Override
@@ -1560,169 +1518,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
}
- @Override
- public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
- BlockingMode block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
- }
-
- @Override
- public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
- BlockingMode block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
- }
-
- private <A> CompletionState readOrWrite(boolean read,
- ByteBuffer[] buffers, int offset, int length,
- BlockingMode block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- IOException ioe = getError();
- if (ioe != null) {
- handler.failed(ioe, attachment);
- return CompletionState.ERROR;
- }
- if (timeout == -1) {
- timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
- } else if (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout())) {
- if (read) {
- setReadTimeout(unit.toMillis(timeout));
- } else {
- setWriteTimeout(unit.toMillis(timeout));
- }
- }
- if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
- try {
- if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
- handler.failed(new SocketTimeoutException(), attachment);
- return CompletionState.ERROR;
- }
- } catch (InterruptedException e) {
- handler.failed(e, attachment);
- return CompletionState.ERROR;
- }
- } else {
- if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
- if (block == BlockingMode.NON_BLOCK) {
- return CompletionState.NOT_DONE;
- } else {
- handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
- return CompletionState.ERROR;
- }
- }
- }
- VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
- OperationState<A> state = new OperationState<>(read, buffers, offset, length, block,
- attachment, check, handler, read ? readPending : writePending, completion);
- if (read) {
- readOperation = state;
- } else {
- writeOperation = state;
- }
- state.run();
- if (block == BlockingMode.BLOCK) {
- synchronized (state) {
- if (state.state == CompletionState.PENDING) {
- try {
- state.wait(unit.toMillis(timeout));
- if (state.state == CompletionState.PENDING) {
- return CompletionState.ERROR;
- }
- } catch (InterruptedException e) {
- handler.failed(new SocketTimeoutException(), attachment);
- return CompletionState.ERROR;
- }
- }
- }
- }
- return state.state;
- }
-
- private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
- @Override
- public void completed(Long nBytes, OperationState<A> state) {
- if (nBytes.longValue() < 0) {
- failed(new EOFException(), state);
- } else {
- state.nBytes += nBytes.longValue();
- CompletionState currentState = state.inline ? CompletionState.INLINE : CompletionState.DONE;
- boolean complete = true;
- boolean completion = true;
- if (state.check != null) {
- CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length);
- if (call == CompletionHandlerCall.CONTINUE) {
- complete = false;
- } else if (call == CompletionHandlerCall.NONE) {
- completion = false;
- }
- }
- if (complete) {
- boolean notify = false;
- state.semaphore.release();
- if (state.read) {
- readOperation = null;
- } else {
- writeOperation = null;
- }
- if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
- notify = true;
- } else {
- state.state = currentState;
- }
- if (completion && state.handler != null) {
- state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
- }
- synchronized (state) {
- state.completionDone = true;
- if (notify) {
- state.state = currentState;
- state.notify();
- }
- }
- } else {
- synchronized (state) {
- state.completionDone = true;
- }
- state.run();
- }
- }
- }
- @Override
- public void failed(Throwable exc, OperationState<A> state) {
- IOException ioe = null;
- if (exc instanceof InterruptedByTimeoutException) {
- ioe = new SocketTimeoutException();
- exc = ioe;
- } else if (exc instanceof IOException) {
- ioe = (IOException) exc;
- }
- setError(ioe);
- boolean notify = false;
- state.semaphore.release();
- if (state.read) {
- readOperation = null;
- } else {
- writeOperation = null;
- }
- if (state.block == BlockingMode.BLOCK) {
- notify = true;
- } else {
- state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE;
- }
- if (state.handler != null) {
- state.handler.failed(exc, state.attachment);
- }
- synchronized (state) {
- state.completionDone = true;
- if (notify) {
- state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE;
- state.notify();
- }
- }
- }
- }
-
}
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index b09284a..60e383d 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -16,11 +16,17 @@
*/
package org.apache.tomcat.util.net;
+import java.io.EOFException;
import java.io.IOException;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
+import java.nio.channels.InterruptedByTimeoutException;
+import java.nio.channels.ReadPendingException;
+import java.nio.channels.WritePendingException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -28,6 +34,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.res.StringManager;
public abstract class SocketWrapperBase<E> {
@@ -93,12 +100,24 @@ public abstract class SocketWrapperBase<E> {
*/
protected final WriteBuffer nonBlockingWriteBuffer = new WriteBuffer(bufferedWriteSize);
+ protected final Semaphore readPending;
+ protected OperationState<?> readOperation = null;
+ protected final Semaphore writePending;
+ protected OperationState<?> writeOperation = null;
+
public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
this.socket = socket;
this.endpoint = endpoint;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.blockingStatusReadLock = lock.readLock();
this.blockingStatusWriteLock = lock.writeLock();
+ if (endpoint.getUseAsyncIO() || needSemaphores()) {
+ readPending = new Semaphore(1);
+ writePending = new Semaphore(1);
+ } else {
+ readPending = null;
+ writePending = null;
+ }
}
public E getSocket() {
@@ -952,12 +971,191 @@ public abstract class SocketWrapperBase<E> {
public static final CompletionCheck COMPLETE_READ = COMPLETE_WRITE;
/**
+ * Internal state tracker for vectored operations.
+ */
+ protected abstract class OperationState<A> implements Runnable {
+ protected final boolean read;
+ protected final ByteBuffer[] buffers;
+ protected final int offset;
+ protected final int length;
+ protected final A attachment;
+ protected final long timeout;
+ protected final TimeUnit unit;
+ protected final BlockingMode block;
+ protected final CompletionCheck check;
+ protected final CompletionHandler<Long, ? super A> handler;
+ protected final Semaphore semaphore;
+ protected final VectoredIOCompletionHandler<A> completion;
+ protected OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+ Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
+ this.read = read;
+ this.buffers = buffers;
+ this.offset = offset;
+ this.length = length;
+ this.block = block;
+ this.timeout = timeout;
+ this.unit = unit;
+ this.attachment = attachment;
+ this.check = check;
+ this.handler = handler;
+ this.semaphore = semaphore;
+ this.completion = completion;
+ }
+ protected volatile long nBytes = 0;
+ protected volatile CompletionState state = CompletionState.PENDING;
+ protected volatile boolean inline = true;
+ protected boolean completionDone = true;
+
+ /**
+ * @return true if the operation is still inline, false if the operation
+ * is running on a thread that is not the original caller
+ */
+ protected abstract boolean isInline();
+
+ /**
+ * Process the operation using the connector executor.
+ * @return true if the operation was accepted, false if the executor
+ * rejected execurtion
+ */
+ protected boolean process() {
+ try {
+ getEndpoint().getExecutor().execute(this);
+ } catch (RejectedExecutionException ree) {
+ log.warn(sm.getString("endpoint.executor.fail", SocketWrapperBase.this) , ree);
+ return false;
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Start the operation, this will typically call run.
+ */
+ protected abstract void start();
+
+ }
+
+ /**
+ * Completion handler for vectored operations. This will check the completion of the operation,
+ * then either continue or call the user provided completion handler.
+ */
+ protected class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
+ @Override
+ public void completed(Long nBytes, OperationState<A> state) {
+ if (nBytes.longValue() < 0) {
+ failed(new EOFException(), state);
+ } else {
+ state.nBytes += nBytes.longValue();
+ CompletionState currentState = state.isInline() ? CompletionState.INLINE : CompletionState.DONE;
+ boolean complete = true;
+ boolean completion = true;
+ if (state.check != null) {
+ CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length);
+ if (call == CompletionHandlerCall.CONTINUE) {
+ complete = false;
+ } else if (call == CompletionHandlerCall.NONE) {
+ completion = false;
+ }
+ }
+ if (complete) {
+ boolean notify = false;
+ state.semaphore.release();
+ if (state.read) {
+ readOperation = null;
+ } else {
+ writeOperation = null;
+ }
+ if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
+ notify = true;
+ } else {
+ state.state = currentState;
+ }
+ if (completion && state.handler != null) {
+ state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+ }
+ synchronized (state) {
+ state.completionDone = true;
+ if (notify) {
+ state.state = currentState;
+ state.notify();
+ }
+ }
+ } else {
+ synchronized (state) {
+ state.completionDone = true;
+ }
+ state.run();
+ }
+ }
+ }
+ @Override
+ public void failed(Throwable exc, OperationState<A> state) {
+ IOException ioe = null;
+ if (exc instanceof InterruptedByTimeoutException) {
+ ioe = new SocketTimeoutException();
+ exc = ioe;
+ } else if (exc instanceof IOException) {
+ ioe = (IOException) exc;
+ }
+ setError(ioe);
+ boolean notify = false;
+ state.semaphore.release();
+ if (state.read) {
+ readOperation = null;
+ } else {
+ writeOperation = null;
+ }
+ if (state.block == BlockingMode.BLOCK) {
+ notify = true;
+ } else {
+ state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
+ }
+ if (state.handler != null) {
+ state.handler.failed(exc, state.attachment);
+ }
+ synchronized (state) {
+ state.completionDone = true;
+ if (notify) {
+ state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
+ state.notify();
+ }
+ }
+ }
+ }
+
+ /**
* Allows using NIO2 style read/write only for connectors that can
* efficiently support it.
*
* @return This default implementation always returns {@code false}
*/
public boolean hasAsyncIO() {
+ // The semaphores are only created if async IO is enabled
+ return (readPending != null);
+ }
+
+ /**
+ * Allows indicating if the connector needs semaphores.
+ *
+ * @return This default implementation always returns {@code false}
+ */
+ public boolean needSemaphores() {
+ return false;
+ }
+
+ /**
+ * Allows indicating if the connector supports per operation timeout.
+ *
+ * @return This default implementation always returns {@code false}
+ */
+ public boolean hasPerOperationTimeout() {
return false;
}
@@ -1086,7 +1284,7 @@ public abstract class SocketWrapperBase<E> {
public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- throw new UnsupportedOperationException();
+ return vectoredOperation(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
}
/**
@@ -1169,12 +1367,111 @@ public abstract class SocketWrapperBase<E> {
public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- throw new UnsupportedOperationException();
+ return vectoredOperation(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
+ }
+
+
+ /**
+ * Vectored operation. The completion handler will be called once
+ * the operation is complete or an error occurred. If a CompletionCheck
+ * object has been provided, the completion handler will only be
+ * called if the callHandler method returned true. If no
+ * CompletionCheck object has been provided, the default NIO2
+ * behavior is used: the completion handler will be called, even
+ * if the operation is incomplete, or if the operation completed inline.
+ *
+ * @param read true if the operation is a read, false if it is a write
+ * @param buffers buffers
+ * @param offset in the buffer array
+ * @param length in the buffer array
+ * @param block is the blocking mode that will be used for this operation
+ * @param timeout timeout duration for the write
+ * @param unit units for the timeout duration
+ * @param attachment an object to attach to the I/O operation that will be
+ * used when calling the completion handler
+ * @param check for the IO operation completion
+ * @param handler to call when the IO is complete
+ * @param <A> The attachment type
+ * @return the completion state (done, done inline, or still pending)
+ */
+ protected <A> CompletionState vectoredOperation(boolean read,
+ ByteBuffer[] buffers, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+ IOException ioe = getError();
+ if (ioe != null) {
+ handler.failed(ioe, attachment);
+ return CompletionState.ERROR;
+ }
+ if (timeout == -1) {
+ timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
+ } else if (!hasPerOperationTimeout() && (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout()))) {
+ if (read) {
+ setReadTimeout(unit.toMillis(timeout));
+ } else {
+ setWriteTimeout(unit.toMillis(timeout));
+ }
+ }
+ if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
+ try {
+ if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
+ handler.failed(new SocketTimeoutException(), attachment);
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(e, attachment);
+ return CompletionState.ERROR;
+ }
+ } else {
+ if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
+ if (block == BlockingMode.NON_BLOCK) {
+ return CompletionState.NOT_DONE;
+ } else {
+ handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
+ return CompletionState.ERROR;
+ }
+ }
+ }
+ VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
+ OperationState<A> state = newOperationState(read, buffers, offset, length, block, timeout, unit,
+ attachment, check, handler, read ? readPending : writePending, completion);
+ if (read) {
+ readOperation = state;
+ } else {
+ writeOperation = state;
+ }
+ state.start();
+ if (block == BlockingMode.BLOCK) {
+ synchronized (state) {
+ if (state.state == CompletionState.PENDING) {
+ try {
+ state.wait(unit.toMillis(timeout));
+ if (state.state == CompletionState.PENDING) {
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(new SocketTimeoutException(), attachment);
+ return CompletionState.ERROR;
+ }
+ }
+ }
+ }
+ return state.state;
}
+ protected abstract <A> OperationState<A> newOperationState(boolean read,
+ ByteBuffer[] buffers, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+ Semaphore semaphore, VectoredIOCompletionHandler<A> completion);
// --------------------------------------------------------- Utility methods
+ protected static long toTimeout(long timeout) {
+ // Many calls can't do infinite timeout so use Long.MAX_VALUE if timeout is <= 0
+ return (timeout > 0) ? timeout : Long.MAX_VALUE;
+ }
+
protected static int transfer(byte[] from, int offset, int length, ByteBuffer to) {
int max = Math.min(length, to.remaining());
if (max > 0) {
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 1bbaa2f..459c293 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -108,6 +108,10 @@
<fix>
Avoid blocking write of internal buffer when using async IO. (remm)
</fix>
+ <scode>
+ Refactor async IO implementation to the <code>SocketWrapperBase</code>.
+ (remm)
+ </scode>
</changelog>
</subsection>
<subsection name="Other">
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org