You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2019/03/20 09:54:59 UTC

[tomcat] branch 8.5.x updated: Refactor write notifications

This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/8.5.x by this push:
     new 85c2e8c  Refactor write notifications
85c2e8c is described below

commit 85c2e8c8b9ee81b9292cada33d0185df56a2a278
Author: remm <re...@apache.org>
AuthorDate: Wed Mar 20 10:54:49 2019 +0100

    Refactor write notifications
    
    Use the model from read notifications, and simplify (normally nesting
    shouldn't be an issue, the code should ensure one notification when the
    write ends, and avoid any until the next write at least), to attempt to
    fix leftover very rare CI failure on TestCoyoteOutputStream. One
    possible issue right now is the write semaphore release, so it needs
    some testing.
---
 java/org/apache/tomcat/util/net/Nio2Endpoint.java | 71 ++++++++++++-----------
 webapps/docs/changelog.xml                        |  3 +
 2 files changed, 40 insertions(+), 34 deletions(-)

diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index dcd9bb9..992063f 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -38,7 +38,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLSession;
@@ -443,14 +442,6 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
 
     public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> {
 
-        private static final ThreadLocal<AtomicInteger> nestedWriteCompletionCount =
-                new ThreadLocal<AtomicInteger>() {
-            @Override
-            protected AtomicInteger initialValue() {
-                return new AtomicInteger(0);
-            }
-        };
-
         private SendfileData sendfileData = null;
 
         private final CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
@@ -599,37 +590,38 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
                 @Override
                 public void completed(Integer nBytes, ByteBuffer attachment) {
                     writeNotify = false;
+                    boolean notify = false;
                     synchronized (writeCompletionHandler) {
                         if (nBytes.intValue() < 0) {
                             failed(new EOFException(sm.getString("iob.failedwrite")), attachment);
                         } else if (!nonBlockingWriteBuffer.isEmpty()) {
-                            nestedWriteCompletionCount.get().incrementAndGet();
                             // Continue writing data using a gathering write
                             ByteBuffer[] array = nonBlockingWriteBuffer.toArray(attachment);
                             getSocket().write(array, 0, array.length,
                                     toNio2Timeout(getWriteTimeout()), TimeUnit.MILLISECONDS,
                                     array, gatheringWriteCompletionHandler);
-                            nestedWriteCompletionCount.get().decrementAndGet();
                         } else if (attachment.hasRemaining()) {
                             // Regular write
-                            nestedWriteCompletionCount.get().incrementAndGet();
                             getSocket().write(attachment, toNio2Timeout(getWriteTimeout()),
                                     TimeUnit.MILLISECONDS, attachment, writeCompletionHandler);
-                            nestedWriteCompletionCount.get().decrementAndGet();
                         } else {
                             // All data has been written
-                            if (writeInterest) {
-                                writeInterest = false;
+                            if (writeInterest && !Nio2Endpoint.isInline()) {
                                 writeNotify = true;
+                                // Set extra flag so that write nesting does not cause multiple notifications
+                                notify = true;
+                            } else {
+                                // Release here since there will be no
+                                // notify/dispatch to do the release.
+                                writePending.release();
                             }
-                            writePending.release();
+                            writeInterest = false;
                         }
                     }
-                    if (writeNotify && nestedWriteCompletionCount.get().get() == 0) {
-                        endpoint.processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_WRITE, Nio2Endpoint.isInline());
+                    if (notify) {
+                        endpoint.processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_WRITE, true);
                     }
                 }
-
                 @Override
                 public void failed(Throwable exc, ByteBuffer attachment) {
                     IOException ioe;
@@ -648,31 +640,34 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
                 @Override
                 public void completed(Long nBytes, ByteBuffer[] attachment) {
                     writeNotify = false;
+                    boolean notify = false;
                     synchronized (writeCompletionHandler) {
                         if (nBytes.longValue() < 0) {
                             failed(new EOFException(sm.getString("iob.failedwrite")), attachment);
                         } else if (!nonBlockingWriteBuffer.isEmpty() || arrayHasData(attachment)) {
                             // Continue writing data using a gathering write
-                            nestedWriteCompletionCount.get().incrementAndGet();
                             ByteBuffer[] array = nonBlockingWriteBuffer.toArray(attachment);
                             getSocket().write(array, 0, array.length,
                                     toNio2Timeout(getWriteTimeout()), TimeUnit.MILLISECONDS,
                                     array, gatheringWriteCompletionHandler);
-                            nestedWriteCompletionCount.get().decrementAndGet();
                         } else {
                             // All data has been written
-                            if (writeInterest) {
-                                writeInterest = false;
+                            if (writeInterest && !Nio2Endpoint.isInline()) {
                                 writeNotify = true;
+                                // Set extra flag so that write nesting does not cause multiple notifications
+                                notify = true;
+                            } else {
+                                // Release here since there will be no
+                                // notify/dispatch to do the release.
+                                writePending.release();
                             }
-                            writePending.release();
+                            writeInterest = false;
                         }
                     }
-                    if (writeNotify && nestedWriteCompletionCount.get().get() == 0) {
-                        endpoint.processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_WRITE, Nio2Endpoint.isInline());
+                    if (notify) {
+                        endpoint.processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_WRITE, true);
                     }
                 }
-
                 @Override
                 public void failed(Throwable exc, ByteBuffer[] attachment) {
                     IOException ioe;
@@ -1027,6 +1022,8 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
             if (timeout == -1) {
                 timeout = toNio2Timeout(getWriteTimeout());
             }
+            // Disable any regular write notifications caused by registerWriteInterest
+            writeNotify = true;
             if (block != BlockingMode.NON_BLOCK) {
                 try {
                     if (!writePending.tryAcquire(timeout, unit)) {
@@ -1227,7 +1224,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
             // indicate the end of a write
             // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
             synchronized (writeCompletionHandler) {
-                if (writePending.tryAcquire()) {
+                if (writeNotify || writePending.tryAcquire()) {
                     // No pending completion handler, so writing to the main buffer
                     // is possible
                     socketBufferHandler.configureWriteBufferForWrite();
@@ -1277,7 +1274,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
             // indicate the end of a write
             // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
             synchronized (writeCompletionHandler) {
-                if (writePending.tryAcquire()) {
+                if (writeNotify || writePending.tryAcquire()) {
                     // No pending completion handler, so writing to the main buffer
                     // is possible
                     socketBufferHandler.configureWriteBufferForWrite();
@@ -1357,7 +1354,9 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
         private boolean flushNonBlocking(boolean hasPermit) throws IOException {
             checkError();
             synchronized (writeCompletionHandler) {
-                if (hasPermit || writePending.tryAcquire()) {
+                if (writeNotify || hasPermit || writePending.tryAcquire()) {
+                    // The code that was notified is now writing its data
+                    writeNotify = false;
                     socketBufferHandler.configureWriteBufferForRead();
                     if (!nonBlockingWriteBuffer.isEmpty()) {
                         ByteBuffer[] array = nonBlockingWriteBuffer.toArray(socketBufferHandler.getWriteBuffer());
@@ -1377,6 +1376,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
                         if (!hasPermit) {
                             writePending.release();
                         }
+                        writeInterest = false;
                     }
                 }
                 return hasDataToWrite();
@@ -1397,7 +1397,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
         @Override
         public boolean hasDataToWrite() {
             synchronized (writeCompletionHandler) {
-                return !socketBufferHandler.isWriteBufferEmpty() ||
+                return !socketBufferHandler.isWriteBufferEmpty() || writeNotify ||
                         !nonBlockingWriteBuffer.isEmpty() || getError() != null;
             }
         }
@@ -1478,9 +1478,12 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> {
         @Override
         public void registerWriteInterest() {
             synchronized (writeCompletionHandler) {
-                if (writePending.availablePermits() == 0) {
-                    writeInterest = true;
-                } else {
+                // A notification is already being sent
+                if (writeNotify) {
+                    return;
+                }
+                writeInterest = true;
+                if (writePending.availablePermits() == 1) {
                     // If no write is pending, notify
                     getEndpoint().processSocket(this, SocketEvent.OPEN_WRITE, true);
                 }
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 79c2636..71dcec4 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -82,6 +82,9 @@
         Fix bad interaction between NIO2 async read API and the regular read.
         (remm)
       </fix>
+      <fix>
+        Refactor NIO2 write pending strategy for the classic IO API. (remm)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Web applications">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org