You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2019/03/03 23:03:12 UTC
[tomcat] branch 8.5.x updated: Update async IO API from Tomcat 9
This is an automated email from the ASF dual-hosted git repository.
remm pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/8.5.x by this push:
new eef4b21 Update async IO API from Tomcat 9
eef4b21 is described below
commit eef4b218b7f23a6404e8e32f24dded9a0208cc4c
Author: remm <re...@apache.org>
AuthorDate: Mon Mar 4 00:03:04 2019 +0100
Update async IO API from Tomcat 9
Although not actually used in Tomcat 8.5, it is better (and easy) to
keep it up to date.
---
java/org/apache/tomcat/util/net/Nio2Endpoint.java | 260 +++++++++++++--------
.../apache/tomcat/util/net/SocketWrapperBase.java | 88 ++++---
webapps/docs/changelog.xml | 3 +
3 files changed, 216 insertions(+), 135 deletions(-)
diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index eefb45b..547ed23 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -31,8 +31,6 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
import java.nio.channels.NetworkChannel;
-import java.nio.channels.ReadPendingException;
-import java.nio.channels.WritePendingException;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -936,86 +934,166 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
* Internal state tracker for scatter/gather operations.
*/
private static class OperationState<A> {
+ private final boolean read;
private final ByteBuffer[] buffers;
private final int offset;
private final int length;
private final A attachment;
private final long timeout;
private final TimeUnit unit;
+ private final BlockingMode block;
private final CompletionCheck check;
private final CompletionHandler<Long, ? super A> handler;
- private OperationState(ByteBuffer[] buffers, int offset, int length,
- long timeout, TimeUnit unit, A attachment, CompletionCheck check,
- CompletionHandler<Long, ? super A> handler) {
+ private final Semaphore semaphore;
+ private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+ Semaphore semaphore) {
+ this.read = read;
this.buffers = buffers;
this.offset = offset;
this.length = length;
+ this.block = block;
this.timeout = timeout;
this.unit = unit;
this.attachment = attachment;
this.check = check;
this.handler = handler;
+ this.semaphore = semaphore;
}
private volatile long nBytes = 0;
private volatile CompletionState state = CompletionState.PENDING;
}
- private class ScatterReadCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
@Override
- public void completed(Long nBytes, OperationState<A> state) {
- if (nBytes.intValue() < 0) {
- failed(new EOFException(), state);
- } else {
- state.nBytes += nBytes.longValue();
- CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE;
- boolean complete = true;
- boolean completion = true;
- if (state.check != null) {
- switch (state.check.callHandler(currentState, state.buffers, state.offset, state.length)) {
- case CONTINUE:
- complete = false;
- break;
- case DONE:
- break;
- case NONE:
- completion = false;
- break;
+ public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+ IOException ioe = getError();
+ if (ioe != null) {
+ handler.failed(ioe, attachment);
+ return CompletionState.ERROR;
+ }
+ if (timeout == -1) {
+ timeout = toNio2Timeout(getReadTimeout());
}
+ if (block != BlockingMode.NON_BLOCK) {
+ try {
+ if (!readPending.tryAcquire(timeout, unit)) {
+ handler.failed(new SocketTimeoutException(), attachment);
+ return CompletionState.ERROR;
}
- if (complete) {
- readPending.release();
- state.state = currentState;
- if (completion && state.handler != null) {
- state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+ } catch (InterruptedException e) {
+ handler.failed(e, attachment);
+ return CompletionState.ERROR;
+ }
+ } else {
+ if (!readPending.tryAcquire()) {
+ return CompletionState.NOT_DONE;
+ }
+ }
+ OperationState<A> state = new OperationState<>(true, dsts, offset, length, block,
+ timeout, unit, attachment, check, handler, readPending);
+ VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
+ Nio2Endpoint.startInline();
+ long nBytes = 0;
+ if (!socketBufferHandler.isReadBufferEmpty()) {
+ // There is still data inside the main read buffer, use it to fill out the destination buffers
+ synchronized (readCompletionHandler) {
+ // Note: It is not necessary to put this code in the completion handler
+ socketBufferHandler.configureReadBufferForRead();
+ for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
+ nBytes += transfer(socketBufferHandler.getReadBuffer(), dsts[offset + i]);
+ }
+ }
+ if (nBytes > 0) {
+ completion.completed(Long.valueOf(nBytes), state);
+ }
+ }
+ if (nBytes == 0) {
+ getSocket().read(dsts, offset, length, timeout, unit, state, completion);
+ }
+ Nio2Endpoint.endInline();
+ if (block == BlockingMode.BLOCK) {
+ synchronized (state) {
+ if (state.state == CompletionState.PENDING) {
+ try {
+ state.wait(unit.toMillis(timeout));
+ if (state.state == CompletionState.PENDING) {
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(new SocketTimeoutException(), attachment);
+ return CompletionState.ERROR;
}
- } else {
- getSocket().read(state.buffers, state.offset, state.length,
- state.timeout, state.unit, state, this);
}
}
}
+ return state.state;
+ }
+
@Override
- public void failed(Throwable exc, OperationState<A> state) {
- IOException ioe;
- if (exc instanceof IOException) {
- ioe = (IOException) exc;
- } else {
- ioe = new IOException(exc);
+ public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+ IOException ioe = getError();
+ if (ioe != null) {
+ handler.failed(ioe, attachment);
+ return CompletionState.ERROR;
+ }
+ if (timeout == -1) {
+ timeout = toNio2Timeout(getWriteTimeout());
+ }
+ if (block != BlockingMode.NON_BLOCK) {
+ try {
+ if (!writePending.tryAcquire(timeout, unit)) {
+ handler.failed(new SocketTimeoutException(), attachment);
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(e, attachment);
+ return CompletionState.ERROR;
}
- setError(ioe);
- readPending.release();
- if (exc instanceof AsynchronousCloseException) {
- // If already closed, don't call onError and close again
- return;
+ } else {
+ if (!writePending.tryAcquire()) {
+ return CompletionState.NOT_DONE;
}
- state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
- if (state.handler != null) {
- state.handler.failed(ioe, state.attachment);
+ }
+ if (!socketBufferHandler.isWriteBufferEmpty()) {
+ // First flush the main buffer as needed
+ try {
+ doWrite(true);
+ } catch (IOException e) {
+ handler.failed(e, attachment);
+ return CompletionState.ERROR;
+ }
+ }
+ OperationState<A> state = new OperationState<>(false, srcs, offset, length, block,
+ timeout, unit, attachment, check, handler, writePending);
+ VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
+ Nio2Endpoint.startInline();
+ // It should be less necessary to check the buffer state as it is easy to flush before
+ getSocket().write(srcs, offset, length, timeout, unit, state, completion);
+ Nio2Endpoint.endInline();
+ if (block == BlockingMode.BLOCK) {
+ synchronized (state) {
+ if (state.state == CompletionState.PENDING) {
+ try {
+ state.wait(unit.toMillis(timeout));
+ if (state.state == CompletionState.PENDING) {
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(new SocketTimeoutException(), attachment);
+ return CompletionState.ERROR;
+ }
+ }
}
}
+ return state.state;
}
- private class GatherWriteCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
+ private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
@Override
public void completed(Long nBytes, OperationState<A> state) {
if (nBytes.longValue() < 0) {
@@ -1038,14 +1116,30 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
}
}
if (complete) {
- writePending.release();
- state.state = currentState;
+ boolean notify = false;
+ state.semaphore.release();
+ if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
+ notify = true;
+ } else {
+ state.state = currentState;
+ }
if (completion && state.handler != null) {
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
}
+ if (notify) {
+ synchronized (state) {
+ state.state = currentState;
+ state.notify();
+ }
+ }
} else {
- getSocket().write(state.buffers, state.offset, state.length,
- state.timeout, state.unit, state, this);
+ if (state.read) {
+ getSocket().read(state.buffers, state.offset, state.length,
+ state.timeout, state.unit, state, this);
+ } else {
+ getSocket().write(state.buffers, state.offset, state.length,
+ state.timeout, state.unit, state, this);
+ }
}
}
}
@@ -1058,63 +1152,23 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
ioe = new IOException(exc);
}
setError(ioe);
- writePending.release();
- state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
- if (state.handler != null) {
- state.handler.failed(ioe, state.attachment);
- }
- }
- }
-
- @Override
- public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
- boolean block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- OperationState<A> state = new OperationState<>(dsts, offset, length, timeout, unit, attachment, check, handler);
- try {
- if ((!block && readPending.tryAcquire()) || (block && readPending.tryAcquire(timeout, unit))) {
- Nio2Endpoint.startInline();
- getSocket().read(dsts, offset, length, timeout, unit, state, new ScatterReadCompletionHandler<A>());
- Nio2Endpoint.endInline();
+ boolean notify = false;
+ state.semaphore.release();
+ if (state.block == BlockingMode.BLOCK) {
+ notify = true;
} else {
- throw new ReadPendingException();
+ state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
}
- if (block && state.state == CompletionState.PENDING && readPending.tryAcquire(timeout, unit)) {
- readPending.release();
- }
- } catch (InterruptedException e) {
- handler.failed(e, attachment);
- }
- return state.state;
- }
-
- @Override
- public boolean isWritePending() {
- synchronized (writeCompletionHandler) {
- return writePending.availablePermits() == 0;
- }
- }
-
- @Override
- public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
- boolean block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- OperationState<A> state = new OperationState<>(srcs, offset, length, timeout, unit, attachment, check, handler);
- try {
- if ((!block && writePending.tryAcquire()) || (block && writePending.tryAcquire(timeout, unit))) {
- Nio2Endpoint.startInline();
- getSocket().write(srcs, offset, length, timeout, unit, state, new GatherWriteCompletionHandler<A>());
- Nio2Endpoint.endInline();
- } else {
- throw new WritePendingException();
+ if (state.handler != null) {
+ state.handler.failed(ioe, state.attachment);
}
- if (block && state.state == CompletionState.PENDING && writePending.tryAcquire(timeout, unit)) {
- writePending.release();
+ if (notify) {
+ synchronized (state) {
+ state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
+ state.notify();
+ }
}
- } catch (InterruptedException e) {
- handler.failed(e, attachment);
}
- return state.state;
}
/* Callers of this method must:
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index d8e5d22..4f40afd 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -789,12 +789,33 @@ public abstract class SocketWrapperBase<E> {
// ------------------------------------------------------- NIO 2 style APIs
+ public enum BlockingMode {
+ /**
+ * The operation will now block. If there are pending operations,
+ * the operation will return CompletionState.NOT_DONE.
+ */
+ NON_BLOCK,
+ /**
+ * The operation will block until pending operations are completed, but
+ * will not block after performing it.
+ */
+ SEMI_BLOCK,
+ /**
+ * The operation will block until completed.
+ */
+ BLOCK
+ }
+
public enum CompletionState {
/**
* Operation is still pending.
*/
PENDING,
/**
+ * Operation was pending and non blocking.
+ */
+ NOT_DONE,
+ /**
* The operation completed inline.
*/
INLINE,
@@ -853,8 +874,8 @@ public abstract class SocketWrapperBase<E> {
@Override
public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers,
int offset, int length) {
- for (int i = 0; i < offset; i++) {
- if (buffers[i].remaining() > 0) {
+ for (int i = 0; i < length; i++) {
+ if (buffers[offset + i].remaining() > 0) {
return CompletionHandlerCall.CONTINUE;
}
}
@@ -864,6 +885,23 @@ public abstract class SocketWrapperBase<E> {
};
/**
+ * This utility CompletionCheck will cause the write to fully write
+ * all remaining data. The completion handler will then be called.
+ */
+ public static final CompletionCheck COMPLETE_WRITE_WITH_COMPLETION = new CompletionCheck() {
+ @Override
+ public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers,
+ int offset, int length) {
+ for (int i = 0; i < length; i++) {
+ if (buffers[offset + i].remaining() > 0) {
+ return CompletionHandlerCall.CONTINUE;
+ }
+ }
+ return CompletionHandlerCall.DONE;
+ }
+ };
+
+ /**
* This utility CompletionCheck will cause the completion handler
* to be called once some data has been read. If the operation
* completes inline, the completion handler will not be called.
@@ -942,11 +980,7 @@ public abstract class SocketWrapperBase<E> {
* behavior is used: the completion handler will be called as soon
* as some data has been read, even if the read has completed inline.
*
- * @param block true to block until any pending read is done, if the
- * timeout occurs and a read is still pending, a
- * ReadPendingException will be thrown; false to
- * not block but any pending read operation will cause
- * a ReadPendingException
+ * @param block is the blocking mode that will be used for this operation
* @param timeout timeout duration for the read
* @param unit units for the timeout duration
* @param attachment an object to attach to the I/O operation that will be
@@ -957,8 +991,9 @@ public abstract class SocketWrapperBase<E> {
* @param <A> The attachment type
* @return the completion state (done, done inline, or still pending)
*/
- public final <A> CompletionState read(boolean block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) {
+ public final <A> CompletionState read(BlockingMode block, long timeout,
+ TimeUnit unit, A attachment, CompletionCheck check,
+ CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) {
if (dsts == null) {
throw new IllegalArgumentException();
}
@@ -977,11 +1012,7 @@ public abstract class SocketWrapperBase<E> {
* @param dsts buffers
* @param offset in the buffer array
* @param length in the buffer array
- * @param block true to block until any pending read is done, if the
- * timeout occurs and a read is still pending, a
- * ReadPendingException will be thrown; false to
- * not block but any pending read operation will cause
- * a ReadPendingException
+ * @param block is the blocking mode that will be used for this operation
* @param timeout timeout duration for the read
* @param unit units for the timeout duration
* @param attachment an object to attach to the I/O operation that will be
@@ -991,9 +1022,9 @@ public abstract class SocketWrapperBase<E> {
* @param <A> The attachment type
* @return the completion state (done, done inline, or still pending)
*/
- public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, boolean block,
- long timeout, TimeUnit unit, A attachment, CompletionCheck check,
- CompletionHandler<Long, ? super A> handler) {
+ public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
throw new UnsupportedOperationException();
}
@@ -1007,11 +1038,7 @@ public abstract class SocketWrapperBase<E> {
* if the write is incomplete and data remains in the buffers, or
* if the write completed inline.
*
- * @param block true to block until any pending write is done, if the
- * timeout occurs and a write is still pending, a
- * WritePendingException will be thrown; false to
- * not block but any pending write operation will cause
- * a WritePendingException
+ * @param block is the blocking mode that will be used for this operation
* @param timeout timeout duration for the write
* @param unit units for the timeout duration
* @param attachment an object to attach to the I/O operation that will be
@@ -1022,8 +1049,9 @@ public abstract class SocketWrapperBase<E> {
* @param <A> The attachment type
* @return the completion state (done, done inline, or still pending)
*/
- public final <A> CompletionState write(boolean block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) {
+ public final <A> CompletionState write(BlockingMode block, long timeout,
+ TimeUnit unit, A attachment, CompletionCheck check,
+ CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) {
if (srcs == null) {
throw new IllegalArgumentException();
}
@@ -1043,11 +1071,7 @@ public abstract class SocketWrapperBase<E> {
* @param srcs buffers
* @param offset in the buffer array
* @param length in the buffer array
- * @param block true to block until any pending write is done, if the
- * timeout occurs and a write is still pending, a
- * WritePendingException will be thrown; false to
- * not block but any pending write operation will cause
- * a WritePendingException
+ * @param block is the blocking mode that will be used for this operation
* @param timeout timeout duration for the write
* @param unit units for the timeout duration
* @param attachment an object to attach to the I/O operation that will be
@@ -1057,9 +1081,9 @@ public abstract class SocketWrapperBase<E> {
* @param <A> The attachment type
* @return the completion state (done, done inline, or still pending)
*/
- public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, boolean block,
- long timeout, TimeUnit unit, A attachment, CompletionCheck check,
- CompletionHandler<Long, ? super A> handler) {
+ public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
throw new UnsupportedOperationException();
}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 38ced8e..e620ee1 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -135,6 +135,9 @@
<a href="https://bugs.openjdk.java.net/browse/JDK-8157404">JRE KeyStore
loading bug</a>. (markt)
</add>
+ <update>
+ Sync with NIO2 async API from Tomcat 9 branch. (remm)
+ </update>
</changelog>
</subsection>
<subsection name="WebSocket">
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org