You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2019/05/14 14:21:04 UTC

[tomcat] branch master updated: Avoid blocking write of internal buffer

This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b7f845  Avoid blocking write of internal buffer
0b7f845 is described below

commit 0b7f845e191e6e88fe364190e4adfa6900d8e468
Author: remm <re...@apache.org>
AuthorDate: Tue May 14 16:20:54 2019 +0200

    Avoid blocking write of internal buffer
    
    This introduces some "useless" code, but this is to align with NIO and
    APR code before moving code into SocketWrapperBase. There are a couple
    of slightly risky changes (the async write of the write buffer, and
    delaying setting the read/writeNotify flags until after getting the
    semaphores) that could cause CI instability again (as usual the
    testsuite passes for me ...).
---
 java/org/apache/tomcat/util/net/Nio2Endpoint.java | 215 +++++++++++-----------
 webapps/docs/changelog.xml                        |   3 +-
 2 files changed, 107 insertions(+), 111 deletions(-)

diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index b56cbad..8f2de8d 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -37,6 +37,7 @@ import java.nio.file.StandardOpenOption;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -953,22 +954,23 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
         /**
          * Internal state tracker for scatter/gather operations.
          */
-        private static class OperationState<A> {
-            private final boolean read;
-            private final ByteBuffer[] buffers;
-            private final int offset;
-            private final int length;
-            private final A attachment;
-            private final long timeout;
-            private final TimeUnit unit;
-            private final BlockingMode block;
-            private final CompletionCheck check;
-            private final CompletionHandler<Long, ? super A> handler;
-            private final Semaphore semaphore;
-            private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+        protected class OperationState<A> implements Runnable {
+            protected final boolean read;
+            protected final ByteBuffer[] buffers;
+            protected final int offset;
+            protected final int length;
+            protected final A attachment;
+            protected final long timeout;
+            protected final TimeUnit unit;
+            protected final BlockingMode block;
+            protected final CompletionCheck check;
+            protected final CompletionHandler<Long, ? super A> handler;
+            protected final Semaphore semaphore;
+            protected final VectoredIOCompletionHandler<A> completion;
+            protected OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
                     BlockingMode block, long timeout, TimeUnit unit, A attachment,
                     CompletionCheck check, CompletionHandler<Long, ? super A> handler,
-                    Semaphore semaphore) {
+                    Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
                 this.read = read;
                 this.buffers = buffers;
                 this.offset = offset;
@@ -980,102 +982,115 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                 this.check = check;
                 this.handler = handler;
                 this.semaphore = semaphore;
+                this.completion = completion;
             }
-            private volatile long nBytes = 0;
-            private volatile CompletionState state = CompletionState.PENDING;
-        }
+            protected volatile long nBytes = 0;
+            protected volatile CompletionState state = CompletionState.PENDING;
 
-        @Override
-        public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
-                BlockingMode block, long timeout, TimeUnit unit, A attachment,
-                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
-            IOException ioe = getError();
-            if (ioe != null) {
-                handler.failed(ioe, attachment);
-                return CompletionState.ERROR;
-            }
-            if (timeout == -1) {
-                timeout = toTimeout(getReadTimeout());
+            public boolean isInline() {
+                return Nio2Endpoint.isInline();
             }
-            // Disable any regular read notifications caused by registerReadInterest
-            readNotify = true;
-            if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
+
+            public boolean process() {
                 try {
-                    if (!readPending.tryAcquire(timeout, unit)) {
-                        handler.failed(new SocketTimeoutException(), attachment);
-                        return CompletionState.ERROR;
-                    }
-                } catch (InterruptedException e) {
-                    handler.failed(e, attachment);
-                    return CompletionState.ERROR;
-                }
-            } else {
-                if (!readPending.tryAcquire()) {
-                    if (block == BlockingMode.NON_BLOCK) {
-                        return CompletionState.NOT_DONE;
-                    } else {
-                        handler.failed(new ReadPendingException(), attachment);
-                        return CompletionState.ERROR;
-                    }
+                    getEndpoint().getExecutor().execute(this);
+                } catch (RejectedExecutionException ree) {
+                    log.warn(sm.getString("endpoint.executor.fail", Nio2SocketWrapper.this) , ree);
+                    return false;
+                } catch (Throwable t) {
+                    ExceptionUtils.handleThrowable(t);
+                    // This means we got an OOM or similar creating a thread, or that
+                    // the pool and its queue are full
+                    log.error(sm.getString("endpoint.process.fail"), t);
+                    return false;
                 }
+                return true;
             }
-            OperationState<A> state = new OperationState<>(true, dsts, offset, length, block,
-                    timeout, unit, attachment, check, handler, readPending);
-            VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
-            Nio2Endpoint.startInline();
-            long nBytes = 0;
-            if (!socketBufferHandler.isReadBufferEmpty()) {
-                // There is still data inside the main read buffer, use it to fill out the destination buffers
-                synchronized (readCompletionHandler) {
-                    // Note: It is not necessary to put this code in the completion handler
-                    socketBufferHandler.configureReadBufferForRead();
-                    for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
-                        nBytes += transfer(socketBufferHandler.getReadBuffer(), dsts[offset + i]);
-                    }
-                }
-                if (nBytes > 0) {
-                    completion.completed(Long.valueOf(nBytes), state);
+
+            public void start() {
+                if (read) {
+                    // Disable any regular read notifications caused by registerReadInterest
+                    readNotify = true;
+                } else {
+                    // Disable any regular write notifications caused by registerWriteInterest
+                    writeNotify = true;
                 }
+                Nio2Endpoint.startInline();
+                run();
+                Nio2Endpoint.endInline();
             }
-            if (nBytes == 0) {
-                getSocket().read(dsts, offset, length, timeout, unit, state, completion);
-            }
-            Nio2Endpoint.endInline();
-            if (block == BlockingMode.BLOCK) {
-                synchronized (state) {
-                    if (state.state == CompletionState.PENDING) {
-                        try {
-                            state.wait(unit.toMillis(timeout));
-                            if (state.state == CompletionState.PENDING) {
-                                return CompletionState.ERROR;
+
+            @Override
+            public void run() {
+                if (read) {
+                    long nBytes = 0;
+                    if (!socketBufferHandler.isReadBufferEmpty()) {
+                        // There is still data inside the main read buffer, use it to fill out the destination buffers
+                        synchronized (readCompletionHandler) {
+                            // Note: It is not necessary to put this code in the completion handler
+                            socketBufferHandler.configureReadBufferForRead();
+                            for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
+                                nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]);
                             }
-                        } catch (InterruptedException e) {
-                            handler.failed(new SocketTimeoutException(), attachment);
-                            return CompletionState.ERROR;
                         }
+                        if (nBytes > 0) {
+                            completion.completed(Long.valueOf(nBytes), this);
+                        }
+                    }
+                    if (nBytes == 0) {
+                        getSocket().read(buffers, offset, length, timeout, unit, this, completion);
                     }
+                } else {
+                    if (!socketBufferHandler.isWriteBufferEmpty()) {
+                        // First flush the main buffer as needed
+                        socketBufferHandler.configureWriteBufferForRead();
+                        getSocket().write(socketBufferHandler.getWriteBuffer(), null, new CompletionHandler<Integer, Void>() {
+                            @Override
+                            public void completed(Integer result, Void attachment) {
+                                run();
+                            }
+                            @Override
+                            public void failed(Throwable exc, Void attachment) {
+                                handler.failed(exc, OperationState.this.attachment);
+                            }
+                        });
+                        return;
+                    }
+                    // It should be less necessary to check the buffer state as it is easy to flush before
+                    getSocket().write(buffers, offset, length, timeout, unit, this, completion);
                 }
             }
-            return state.state;
+        }
+
+        @Override
+        public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+            return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
         }
 
         @Override
         public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
                 BlockingMode block, long timeout, TimeUnit unit, A attachment,
                 CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+            return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
+        }
+
+        private <A> CompletionState readOrWrite(boolean read,
+                ByteBuffer[] buffers, int offset, int length,
+                BlockingMode block, long timeout, TimeUnit unit, A attachment,
+                CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
             IOException ioe = getError();
             if (ioe != null) {
                 handler.failed(ioe, attachment);
                 return CompletionState.ERROR;
             }
             if (timeout == -1) {
-                timeout = toTimeout(getWriteTimeout());
+                timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
             }
-            // Disable any regular write notifications caused by registerWriteInterest
-            writeNotify = true;
             if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
                 try {
-                    if (!writePending.tryAcquire(timeout, unit)) {
+                    if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
                         handler.failed(new SocketTimeoutException(), attachment);
                         return CompletionState.ERROR;
                     }
@@ -1084,31 +1099,19 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                     return CompletionState.ERROR;
                 }
             } else {
-                if (!writePending.tryAcquire()) {
+                if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
                     if (block == BlockingMode.NON_BLOCK) {
                         return CompletionState.NOT_DONE;
                     } else {
-                        handler.failed(new WritePendingException(), attachment);
+                        handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
                         return CompletionState.ERROR;
                     }
                 }
             }
-            if (!socketBufferHandler.isWriteBufferEmpty()) {
-                // First flush the main buffer as needed
-                try {
-                    doWrite(true);
-                } catch (IOException e) {
-                    handler.failed(e, attachment);
-                    return CompletionState.ERROR;
-                }
-            }
-            OperationState<A> state = new OperationState<>(false, srcs, offset, length, block,
-                    timeout, unit, attachment, check, handler, writePending);
             VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
-            Nio2Endpoint.startInline();
-            // It should be less necessary to check the buffer state as it is easy to flush before
-            getSocket().write(srcs, offset, length, timeout, unit, state, completion);
-            Nio2Endpoint.endInline();
+            OperationState<A> state = new OperationState<>(read, buffers, offset, length, block, timeout, unit,
+                    attachment, check, handler, read ? readPending : writePending, completion);
+            state.start();
             if (block == BlockingMode.BLOCK) {
                 synchronized (state) {
                     if (state.state == CompletionState.PENDING) {
@@ -1134,7 +1137,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                     failed(new EOFException(), state);
                 } else {
                     state.nBytes += nBytes.longValue();
-                    CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE;
+                    CompletionState currentState = state.isInline() ? CompletionState.INLINE : CompletionState.DONE;
                     boolean complete = true;
                     boolean completion = true;
                     if (state.check != null) {
@@ -1163,13 +1166,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                             }
                         }
                     } else {
-                        if (state.read) {
-                            getSocket().read(state.buffers, state.offset, state.length,
-                                    state.timeout, state.unit, state, this);
-                        } else {
-                            getSocket().write(state.buffers, state.offset, state.length,
-                                    state.timeout, state.unit, state, this);
-                        }
+                        state.run();
                     }
                 }
             }
@@ -1188,14 +1185,14 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                 if (state.block == BlockingMode.BLOCK) {
                     notify = true;
                 } else {
-                    state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
+                    state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
                 }
                 if (state.handler != null) {
                     state.handler.failed(exc, state.attachment);
                 }
                 if (notify) {
                     synchronized (state) {
-                        state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
+                        state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
                         state.notify();
                     }
                 }
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 4f83bb2..1bbaa2f 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -106,8 +106,7 @@
         default due to low performance. (remm)
       </add>
       <fix>
-        Avoid blocking write of internal buffer for NIO when using async IO.
-        (remm)
+        Avoid blocking write of internal buffer when using async IO. (remm)
       </fix>
     </changelog>
   </subsection>


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org