You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2019/03/03 23:03:12 UTC

[tomcat] branch 8.5.x updated: Update async IO API from Tomcat 9

This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/8.5.x by this push:
     new eef4b21  Update async IO API from Tomcat 9
eef4b21 is described below

commit eef4b218b7f23a6404e8e32f24dded9a0208cc4c
Author: remm <re...@apache.org>
AuthorDate: Mon Mar 4 00:03:04 2019 +0100

    Update async IO API from Tomcat 9
    
    Although not actually used in Tomcat 8.5, it is better (and easy) to
    keep it up to date.
---
 java/org/apache/tomcat/util/net/Nio2Endpoint.java  | 260 +++++++++++++--------
 .../apache/tomcat/util/net/SocketWrapperBase.java  |  88 ++++---
 webapps/docs/changelog.xml                         |   3 +
 3 files changed, 216 insertions(+), 135 deletions(-)

diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index eefb45b..547ed23 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -31,8 +31,6 @@ import java.nio.channels.ClosedChannelException;
 import java.nio.channels.CompletionHandler;
 import java.nio.channels.FileChannel;
 import java.nio.channels.NetworkChannel;
-import java.nio.channels.ReadPendingException;
-import java.nio.channels.WritePendingException;
 import java.nio.file.StandardOpenOption;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -936,86 +934,166 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
          * Internal state tracker for scatter/gather operations.
          */
         private static class OperationState<A> {
+            private final boolean read;
             private final ByteBuffer[] buffers;
             private final int offset;
             private final int length;
             private final A attachment;
             private final long timeout;
             private final TimeUnit unit;
+            private final BlockingMode block;
             private final CompletionCheck check;
             private final CompletionHandler<Long, ? super A> handler;
-            private OperationState(ByteBuffer[] buffers, int offset, int length,
-                    long timeout, TimeUnit unit, A attachment, CompletionCheck check,
-                    CompletionHandler<Long, ? super A> handler) {
+            private final Semaphore semaphore;
+            private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+                    BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                    CompletionCheck check, CompletionHandler<Long, ? super A> handler,
+                    Semaphore semaphore) {
+                this.read = read;
                 this.buffers = buffers;
                 this.offset = offset;
                 this.length = length;
+                this.block = block;
                 this.timeout = timeout;
                 this.unit = unit;
                 this.attachment = attachment;
                 this.check = check;
                 this.handler = handler;
+                this.semaphore = semaphore;
             }
             private volatile long nBytes = 0;
             private volatile CompletionState state = CompletionState.PENDING;
         }
 
-        private class ScatterReadCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
         @Override
-            public void completed(Long nBytes, OperationState<A> state) {
-                if (nBytes.intValue() < 0) {
-                    failed(new EOFException(), state);
-                } else {
-                    state.nBytes += nBytes.longValue();
-                    CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE;
-                    boolean complete = true;
-                    boolean completion = true;
-                    if (state.check != null) {
-                        switch (state.check.callHandler(currentState, state.buffers, state.offset, state.length)) {
-                        case CONTINUE:
-                            complete = false;
-                            break;
-                        case DONE:
-                            break;
-                        case NONE:
-                            completion = false;
-                            break;
+        public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+            IOException ioe = getError();
+            if (ioe != null) {
+                handler.failed(ioe, attachment);
+                return CompletionState.ERROR;
+            }
+            if (timeout == -1) {
+                timeout = toNio2Timeout(getReadTimeout());
             }
+            if (block != BlockingMode.NON_BLOCK) {
+                try {
+                    if (!readPending.tryAcquire(timeout, unit)) {
+                        handler.failed(new SocketTimeoutException(), attachment);
+                        return CompletionState.ERROR;
                     }
-                    if (complete) {
-                        readPending.release();
-                        state.state = currentState;
-                        if (completion && state.handler != null) {
-                            state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+                } catch (InterruptedException e) {
+                    handler.failed(e, attachment);
+                    return CompletionState.ERROR;
+                }
+            } else {
+                if (!readPending.tryAcquire()) {
+                    return CompletionState.NOT_DONE;
+                }
+            }
+            OperationState<A> state = new OperationState<>(true, dsts, offset, length, block,
+                    timeout, unit, attachment, check, handler, readPending);
+            VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
+            Nio2Endpoint.startInline();
+            long nBytes = 0;
+            if (!socketBufferHandler.isReadBufferEmpty()) {
+                // There is still data inside the main read buffer, use it to fill out the destination buffers
+                synchronized (readCompletionHandler) {
+                    // Note: It is not necessary to put this code in the completion handler
+                    socketBufferHandler.configureReadBufferForRead();
+                    for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
+                        nBytes += transfer(socketBufferHandler.getReadBuffer(), dsts[offset + i]);
+                    }
+                }
+                if (nBytes > 0) {
+                    completion.completed(Long.valueOf(nBytes), state);
+                }
+            }
+            if (nBytes == 0) {
+                getSocket().read(dsts, offset, length, timeout, unit, state, completion);
+            }
+            Nio2Endpoint.endInline();
+            if (block == BlockingMode.BLOCK) {
+                synchronized (state) {
+                    if (state.state == CompletionState.PENDING) {
+                        try {
+                            state.wait(unit.toMillis(timeout));
+                            if (state.state == CompletionState.PENDING) {
+                                return CompletionState.ERROR;
+                            }
+                        } catch (InterruptedException e) {
+                            handler.failed(new SocketTimeoutException(), attachment);
+                            return CompletionState.ERROR;
                         }
-                    } else {
-                        getSocket().read(state.buffers, state.offset, state.length,
-                                state.timeout, state.unit, state, this);
                     }
                 }
             }
+            return state.state;
+        }
+
         @Override
-            public void failed(Throwable exc, OperationState<A> state) {
-                IOException ioe;
-                if (exc instanceof IOException) {
-                    ioe = (IOException) exc;
-                } else {
-                    ioe = new IOException(exc);
+        public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+            IOException ioe = getError();
+            if (ioe != null) {
+                handler.failed(ioe, attachment);
+                return CompletionState.ERROR;
+            }
+            if (timeout == -1) {
+                timeout = toNio2Timeout(getWriteTimeout());
+            }
+            if (block != BlockingMode.NON_BLOCK) {
+                try {
+                    if (!writePending.tryAcquire(timeout, unit)) {
+                        handler.failed(new SocketTimeoutException(), attachment);
+                        return CompletionState.ERROR;
+                    }
+                } catch (InterruptedException e) {
+                    handler.failed(e, attachment);
+                    return CompletionState.ERROR;
                 }
-                setError(ioe);
-                readPending.release();
-                if (exc instanceof AsynchronousCloseException) {
-                    // If already closed, don't call onError and close again
-                    return;
+            } else {
+                if (!writePending.tryAcquire()) {
+                    return CompletionState.NOT_DONE;
                 }
-                state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
-                if (state.handler != null) {
-                    state.handler.failed(ioe, state.attachment);
+            }
+            if (!socketBufferHandler.isWriteBufferEmpty()) {
+                // First flush the main buffer as needed
+                try {
+                    doWrite(true);
+                } catch (IOException e) {
+                    handler.failed(e, attachment);
+                    return CompletionState.ERROR;
+                }
+            }
+            OperationState<A> state = new OperationState<>(false, srcs, offset, length, block,
+                    timeout, unit, attachment, check, handler, writePending);
+            VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
+            Nio2Endpoint.startInline();
+            // It should be less necessary to check the buffer state as it is easy to flush before
+            getSocket().write(srcs, offset, length, timeout, unit, state, completion);
+            Nio2Endpoint.endInline();
+            if (block == BlockingMode.BLOCK) {
+                synchronized (state) {
+                    if (state.state == CompletionState.PENDING) {
+                        try {
+                            state.wait(unit.toMillis(timeout));
+                            if (state.state == CompletionState.PENDING) {
+                                return CompletionState.ERROR;
+                            }
+                        } catch (InterruptedException e) {
+                            handler.failed(new SocketTimeoutException(), attachment);
+                            return CompletionState.ERROR;
+                        }
+                    }
                 }
             }
+            return state.state;
         }
 
-        private class GatherWriteCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
+        private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
             @Override
             public void completed(Long nBytes, OperationState<A> state) {
                 if (nBytes.longValue() < 0) {
@@ -1038,14 +1116,30 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
                         }
                     }
                     if (complete) {
-                        writePending.release();
-                        state.state = currentState;
+                        boolean notify = false;
+                        state.semaphore.release();
+                        if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) {
+                            notify = true;
+                        } else {
+                            state.state = currentState;
+                        }
                         if (completion && state.handler != null) {
                             state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
                         }
+                        if (notify) {
+                            synchronized (state) {
+                                state.state = currentState;
+                                state.notify();
+                            }
+                        }
                     } else {
-                        getSocket().write(state.buffers, state.offset, state.length,
-                                state.timeout, state.unit, state, this);
+                        if (state.read) {
+                            getSocket().read(state.buffers, state.offset, state.length,
+                                    state.timeout, state.unit, state, this);
+                        } else {
+                            getSocket().write(state.buffers, state.offset, state.length,
+                                    state.timeout, state.unit, state, this);
+                        }
                     }
                 }
             }
@@ -1058,63 +1152,23 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
                     ioe = new IOException(exc);
                 }
                 setError(ioe);
-                writePending.release();
-                state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
-                if (state.handler != null) {
-                    state.handler.failed(ioe, state.attachment);
-                }
-            }
-        }
-
-        @Override
-        public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
-                boolean block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            OperationState<A> state = new OperationState<>(dsts, offset, length, timeout, unit, attachment, check, handler);
-            try {
-                if ((!block && readPending.tryAcquire()) || (block && readPending.tryAcquire(timeout, unit))) {
-                    Nio2Endpoint.startInline();
-                    getSocket().read(dsts, offset, length, timeout, unit, state, new ScatterReadCompletionHandler<A>());
-                    Nio2Endpoint.endInline();
+                boolean notify = false;
+                state.semaphore.release();
+                if (state.block == BlockingMode.BLOCK) {
+                    notify = true;
                 } else {
-                    throw new ReadPendingException();
+                    state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
                 }
-                if (block && state.state == CompletionState.PENDING && readPending.tryAcquire(timeout, unit)) {
-                    readPending.release();
-                }
-            } catch (InterruptedException e) {
-                handler.failed(e, attachment);
-            }
-            return state.state;
-        }
-
-        @Override
-        public boolean isWritePending() {
-            synchronized (writeCompletionHandler) {
-                return writePending.availablePermits() == 0;
-            }
-        }
-
-        @Override
-        public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
-                boolean block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            OperationState<A> state = new OperationState<>(srcs, offset, length, timeout, unit, attachment, check, handler);
-            try {
-                if ((!block && writePending.tryAcquire()) || (block && writePending.tryAcquire(timeout, unit))) {
-                    Nio2Endpoint.startInline();
-                    getSocket().write(srcs, offset, length, timeout, unit, state, new GatherWriteCompletionHandler<A>());
-                    Nio2Endpoint.endInline();
-                } else {
-                    throw new WritePendingException();
+                if (state.handler != null) {
+                    state.handler.failed(ioe, state.attachment);
                 }
-                if (block && state.state == CompletionState.PENDING && writePending.tryAcquire(timeout, unit)) {
-                    writePending.release();
+                if (notify) {
+                    synchronized (state) {
+                        state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
+                        state.notify();
+                    }
                 }
-            } catch (InterruptedException e) {
-                handler.failed(e, attachment);
             }
-            return state.state;
         }
 
         /* Callers of this method must:
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index d8e5d22..4f40afd 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -789,12 +789,33 @@ public abstract class SocketWrapperBase<E> {
     // ------------------------------------------------------- NIO 2 style APIs
 
 
+    public enum BlockingMode {
+        /**
+         * The operation will now block. If there are pending operations,
+         * the operation will return CompletionState.NOT_DONE.
+         */
+        NON_BLOCK,
+        /**
+         * The operation will block until pending operations are completed, but
+         * will not block after performing it.
+         */
+        SEMI_BLOCK,
+        /**
+         * The operation will block until completed.
+         */
+        BLOCK
+    }
+
     public enum CompletionState {
         /**
          * Operation is still pending.
          */
         PENDING,
         /**
+         * Operation was pending and non blocking.
+         */
+        NOT_DONE,
+        /**
          * The operation completed inline.
          */
         INLINE,
@@ -853,8 +874,8 @@ public abstract class SocketWrapperBase<E> {
         @Override
         public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers,
                 int offset, int length) {
-            for (int i = 0; i < offset; i++) {
-                if (buffers[i].remaining() > 0) {
+            for (int i = 0; i < length; i++) {
+                if (buffers[offset + i].remaining() > 0) {
                     return CompletionHandlerCall.CONTINUE;
                 }
             }
@@ -864,6 +885,23 @@ public abstract class SocketWrapperBase<E> {
     };
 
     /**
+     * This utility CompletionCheck will cause the write to fully write
+     * all remaining data. The completion handler will then be called.
+     */
+    public static final CompletionCheck COMPLETE_WRITE_WITH_COMPLETION = new CompletionCheck() {
+        @Override
+        public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers,
+                int offset, int length) {
+            for (int i = 0; i < length; i++) {
+                if (buffers[offset + i].remaining() > 0) {
+                    return CompletionHandlerCall.CONTINUE;
+                }
+            }
+            return CompletionHandlerCall.DONE;
+        }
+    };
+
+    /**
      * This utility CompletionCheck will cause the completion handler
      * to be called once some data has been read. If the operation
      * completes inline, the completion handler will not be called.
@@ -942,11 +980,7 @@ public abstract class SocketWrapperBase<E> {
      * behavior is used: the completion handler will be called as soon
      * as some data has been read, even if the read has completed inline.
      *
-     * @param block true to block until any pending read is done, if the
-     *        timeout occurs and a read is still pending, a
-     *        ReadPendingException will be thrown; false to
-     *        not block but any pending read operation will cause
-     *        a ReadPendingException
+     * @param block is the blocking mode that will be used for this operation
      * @param timeout timeout duration for the read
      * @param unit units for the timeout duration
      * @param attachment an object to attach to the I/O operation that will be
@@ -957,8 +991,9 @@ public abstract class SocketWrapperBase<E> {
      * @param <A> The attachment type
      * @return the completion state (done, done inline, or still pending)
      */
-    public final <A> CompletionState read(boolean block, long timeout, TimeUnit unit, A attachment,
-            CompletionCheck check, CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) {
+    public final <A> CompletionState read(BlockingMode block, long timeout,
+            TimeUnit unit, A attachment, CompletionCheck check,
+            CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) {
         if (dsts == null) {
             throw new IllegalArgumentException();
         }
@@ -977,11 +1012,7 @@ public abstract class SocketWrapperBase<E> {
      * @param dsts buffers
      * @param offset in the buffer array
      * @param length in the buffer array
-     * @param block true to block until any pending read is done, if the
-     *        timeout occurs and a read is still pending, a
-     *        ReadPendingException will be thrown; false to
-     *        not block but any pending read operation will cause
-     *        a ReadPendingException
+     * @param block is the blocking mode that will be used for this operation
      * @param timeout timeout duration for the read
      * @param unit units for the timeout duration
      * @param attachment an object to attach to the I/O operation that will be
@@ -991,9 +1022,9 @@ public abstract class SocketWrapperBase<E> {
      * @param <A> The attachment type
      * @return the completion state (done, done inline, or still pending)
      */
-    public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, boolean block,
-            long timeout, TimeUnit unit, A attachment, CompletionCheck check,
-            CompletionHandler<Long, ? super A> handler) {
+    public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
+            BlockingMode block, long timeout, TimeUnit unit, A attachment,
+            CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
         throw new UnsupportedOperationException();
     }
 
@@ -1007,11 +1038,7 @@ public abstract class SocketWrapperBase<E> {
      * if the write is incomplete and data remains in the buffers, or
      * if the write completed inline.
      *
-     * @param block true to block until any pending write is done, if the
-     *        timeout occurs and a write is still pending, a
-     *        WritePendingException will be thrown; false to
-     *        not block but any pending write operation will cause
-     *        a WritePendingException
+     * @param block is the blocking mode that will be used for this operation
      * @param timeout timeout duration for the write
      * @param unit units for the timeout duration
      * @param attachment an object to attach to the I/O operation that will be
@@ -1022,8 +1049,9 @@ public abstract class SocketWrapperBase<E> {
      * @param <A> The attachment type
      * @return the completion state (done, done inline, or still pending)
      */
-    public final <A> CompletionState write(boolean block, long timeout, TimeUnit unit, A attachment,
-            CompletionCheck check, CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) {
+    public final <A> CompletionState write(BlockingMode block, long timeout,
+            TimeUnit unit, A attachment, CompletionCheck check,
+            CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) {
         if (srcs == null) {
             throw new IllegalArgumentException();
         }
@@ -1043,11 +1071,7 @@ public abstract class SocketWrapperBase<E> {
      * @param srcs buffers
      * @param offset in the buffer array
      * @param length in the buffer array
-     * @param block true to block until any pending write is done, if the
-     *        timeout occurs and a write is still pending, a
-     *        WritePendingException will be thrown; false to
-     *        not block but any pending write operation will cause
-     *        a WritePendingException
+     * @param block is the blocking mode that will be used for this operation
      * @param timeout timeout duration for the write
      * @param unit units for the timeout duration
      * @param attachment an object to attach to the I/O operation that will be
@@ -1057,9 +1081,9 @@ public abstract class SocketWrapperBase<E> {
      * @param <A> The attachment type
      * @return the completion state (done, done inline, or still pending)
      */
-    public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, boolean block,
-            long timeout, TimeUnit unit, A attachment, CompletionCheck check,
-            CompletionHandler<Long, ? super A> handler) {
+    public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
+            BlockingMode block, long timeout, TimeUnit unit, A attachment,
+            CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
         throw new UnsupportedOperationException();
     }
 
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 38ced8e..e620ee1 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -135,6 +135,9 @@
         <a href="https://bugs.openjdk.java.net/browse/JDK-8157404">JRE KeyStore
         loading bug</a>. (markt)
       </add>
+      <update>
+        Sync with NIO2 async API from Tomcat 9 branch. (remm)
+      </update>
     </changelog>
   </subsection>
   <subsection name="WebSocket">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org