You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2023/02/01 18:21:06 UTC

[tomcat] branch main updated: Align async close implementation with Servlet 6.1 clarification

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/main by this push:
     new e1fb169adb Align async close implementation with Servlet 6.1 clarification
e1fb169adb is described below

commit e1fb169adb953e826fecb4b15a178636aa7c3a13
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Tue Dec 13 11:22:51 2022 +0000

    Align async close implementation with Servlet 6.1 clarification
    
    Includes a test case
---
 java/org/apache/coyote/AbstractProcessor.java      |   8 +-
 java/org/apache/coyote/AbstractProcessorLight.java |   2 +-
 java/org/apache/coyote/AsyncStateMachine.java      |   9 +-
 .../apache/coyote/http11/Http11OutputBuffer.java   |   9 +-
 .../catalina/nonblocking/TestNonBlockingAPI.java   | 157 +++++++++++++++++++++
 webapps/docs/changelog.xml                         |  11 +-
 6 files changed, 182 insertions(+), 14 deletions(-)

diff --git a/java/org/apache/coyote/AbstractProcessor.java b/java/org/apache/coyote/AbstractProcessor.java
index 50743ffece..6d3be069c7 100644
--- a/java/org/apache/coyote/AbstractProcessor.java
+++ b/java/org/apache/coyote/AbstractProcessor.java
@@ -193,7 +193,7 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement
 
 
     @Override
-    public SocketState asyncPostProcess() {
+    public SocketState asyncPostProcess() throws IOException {
         return asyncStateMachine.asyncPostProcess();
     }
 
@@ -570,7 +570,11 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement
             break;
         }
         case ASYNC_POST_PROCESS: {
-            asyncStateMachine.asyncPostProcess();
+            try {
+                asyncStateMachine.asyncPostProcess();
+            } catch (IOException e) {
+                handleIOException(e);
+            }
             break;
         }
 
diff --git a/java/org/apache/coyote/AbstractProcessorLight.java b/java/org/apache/coyote/AbstractProcessorLight.java
index 709530dfc8..9ba345fa66 100644
--- a/java/org/apache/coyote/AbstractProcessorLight.java
+++ b/java/org/apache/coyote/AbstractProcessorLight.java
@@ -192,7 +192,7 @@ public abstract class AbstractProcessorLight implements Processor {
      */
     protected abstract SocketState dispatch(SocketEvent status) throws IOException;
 
-    protected abstract SocketState asyncPostProcess();
+    protected abstract SocketState asyncPostProcess() throws IOException;
 
     protected abstract Log getLog();
 }
diff --git a/java/org/apache/coyote/AsyncStateMachine.java b/java/org/apache/coyote/AsyncStateMachine.java
index b400788831..50b16316a9 100644
--- a/java/org/apache/coyote/AsyncStateMachine.java
+++ b/java/org/apache/coyote/AsyncStateMachine.java
@@ -16,6 +16,7 @@
  */
 package org.apache.coyote;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.juli.logging.Log;
@@ -264,7 +265,7 @@ class AsyncStateMachine {
      * current state. For example, as per SRV.2.3.3.3 can now process calls to
      * complete() or dispatch().
      */
-    synchronized SocketState asyncPostProcess() {
+    synchronized SocketState asyncPostProcess() throws IOException {
         if (state == AsyncState.COMPLETE_PENDING) {
             clearNonBlockingListeners();
             updateState(AsyncState.COMPLETING);
@@ -277,6 +278,9 @@ class AsyncStateMachine {
             updateState(AsyncState.STARTED);
             return SocketState.LONG;
         } else if (state == AsyncState.MUST_COMPLETE || state == AsyncState.COMPLETING) {
+            if (processor.getErrorState().isIoAllowed() && processor.flushBufferedWrite()) {
+                return SocketState.LONG;
+            }
             asyncCtxt.fireOnComplete();
             updateState(AsyncState.DISPATCHED);
             asyncCtxt.decrementInProgressAsyncCount();
@@ -285,6 +289,9 @@ class AsyncStateMachine {
             updateState(AsyncState.DISPATCHING);
             return SocketState.ASYNC_END;
         } else if (state == AsyncState.DISPATCHING) {
+            if (processor.getErrorState().isIoAllowed() && processor.flushBufferedWrite()) {
+                return SocketState.LONG;
+            }
             updateState(AsyncState.DISPATCHED);
             asyncCtxt.decrementInProgressAsyncCount();
             return SocketState.ASYNC_END;
diff --git a/java/org/apache/coyote/http11/Http11OutputBuffer.java b/java/org/apache/coyote/http11/Http11OutputBuffer.java
index 570b90c0d7..e7206a50bd 100644
--- a/java/org/apache/coyote/http11/Http11OutputBuffer.java
+++ b/java/org/apache/coyote/http11/Http11OutputBuffer.java
@@ -565,14 +565,7 @@ public class Http11OutputBuffer implements HttpOutputBuffer {
 
         @Override
         public void end() throws IOException {
-            /*
-             * TODO
-             * As of Servlet 6.1, this flush is (currently) meant to be
-             * non-blocking if the output stream is in non-blocking mode. That
-             * requirement creates various complications I want to discuss with
-             * the EG before I try implementing it.
-             */
-            socketWrapper.flush(true);
+            socketWrapper.flush(response.getWriteListener() == null);
         }
 
         @Override
diff --git a/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java b/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
index 0609eb05b6..018742394c 100644
--- a/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
+++ b/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
@@ -1697,4 +1697,161 @@ public class TestNonBlockingAPI extends TomcatBaseTest {
             // NO-OP
         }
     }
+
+
+    @Test
+    public void testNonBlockingWriteWithClose() throws Exception {
+        AtomicBoolean asyncContextIsComplete = new AtomicBoolean(false);
+        AtomicBoolean asyncContextIsError = new AtomicBoolean(false);
+
+        CountDownLatch beforeCloseLatch = new CountDownLatch(1);
+        CountDownLatch afterCloseLatch = new CountDownLatch(1);
+
+        AtomicInteger written = new AtomicInteger(-1);
+
+        Tomcat tomcat = getTomcatInstance();
+        // Note: Low values of socket.txBufSize can trigger very poor
+        //       performance.
+        Assert.assertTrue(tomcat.getConnector().setProperty("socket.txBufSize", "524228"));
+
+        // No file system docBase required
+        Context ctx = tomcat.addContext("", null);
+
+        TesterAccessLogValve alv = new TesterAccessLogValve();
+        ctx.getPipeline().addValve(alv);
+
+        NBWriteWithCloseServlet servlet = new NBWriteWithCloseServlet(
+                asyncContextIsComplete, asyncContextIsError, beforeCloseLatch, afterCloseLatch, written);
+        String servletName = NBWriteWithCloseServlet.class.getName();
+        Tomcat.addServlet(ctx, servletName, servlet);
+        ctx.addServletMappingDecoded("/", servletName);
+
+        tomcat.start();
+
+        SocketFactory factory = SocketFactory.getDefault();
+        Socket s = factory.createSocket("localhost", getPort());
+
+        OutputStream os = s.getOutputStream();
+        os.write(("GET / HTTP/1.1\r\n" +
+                "Host: localhost:" + getPort() + "\r\n" +
+                "Connection: close\r\n" +
+                "\r\n").getBytes(StandardCharsets.ISO_8859_1));
+        os.flush();
+
+        // Wait for Servlet to fill write buffer
+        beforeCloseLatch.await();
+        // Close should return immediately
+        long start = System.nanoTime();
+        afterCloseLatch.await();
+        long duration = System.nanoTime() - start;
+
+        Assert.assertTrue("Close took [" + duration + "] ns", duration < 1_000_000_000);
+
+        // Read the body
+        InputStream is = s.getInputStream();
+        int read = 0;
+        byte[] buffer = new byte[8192];
+        do {
+            read = is.read(buffer);
+        } while (read != -1);
+
+        os.close();
+        is.close();
+        s.close();
+
+        Assert.assertTrue(asyncContextIsComplete.get());
+    }
+
+
+    @WebServlet(asyncSupported = true)
+    public static class NBWriteWithCloseServlet extends HttpServlet {
+        private static final long serialVersionUID = 1L;
+        private final AtomicBoolean asyncContextIsComplete;
+        private final AtomicBoolean asyncContextIsError;
+        private final CountDownLatch beforeCloseLatch;
+        private final CountDownLatch afterCloseLatch;
+        private final AtomicInteger written;
+
+        public NBWriteWithCloseServlet(AtomicBoolean asyncContextIsComplete, AtomicBoolean asyncContextIsError,
+                CountDownLatch beforeCloseLatch, CountDownLatch afterCloseLatch, AtomicInteger written) {
+            this.asyncContextIsComplete = asyncContextIsComplete;
+            this.asyncContextIsError = asyncContextIsError;
+            this.beforeCloseLatch = beforeCloseLatch;
+            this.afterCloseLatch = afterCloseLatch;
+            this.written = written;
+        }
+
+        @Override
+        protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+            AsyncContext actx = req.startAsync();
+            actx.setTimeout(Long.MAX_VALUE);
+            actx.addListener(new AsyncListener() {
+
+                @Override
+                public void onTimeout(AsyncEvent event) throws IOException {
+                    log.info("onTimeout");
+                }
+
+                @Override
+                public void onStartAsync(AsyncEvent event) throws IOException {
+                    log.info("onStartAsync");
+                }
+
+                @Override
+                public void onError(AsyncEvent event) throws IOException {
+                    log.info("AsyncListener.onError");
+                    asyncContextIsError.set(true);
+                }
+
+                @Override
+                public void onComplete(AsyncEvent event) throws IOException {
+                    log.info("onComplete");
+                    asyncContextIsComplete.set(true);
+                }
+            });
+
+            // Write until buffer is full
+            ServletOutputStream out = resp.getOutputStream();
+            TestWriteListener03 writeListener = new TestWriteListener03(actx, beforeCloseLatch, afterCloseLatch);
+            out.setWriteListener(writeListener);
+
+            written.set(writeListener.written);
+        }
+    }
+
+
+    private static class TestWriteListener03 implements WriteListener {
+        private final AsyncContext ctx;
+        private final CountDownLatch beforeCloseLatch;
+        private final CountDownLatch afterCloseLatch;
+        int written = 0;
+
+        public TestWriteListener03(AsyncContext ctx, CountDownLatch beforeCloseLatch, CountDownLatch afterCloseLatch) {
+            this.ctx = ctx;
+            this.beforeCloseLatch = beforeCloseLatch;
+            this.afterCloseLatch = afterCloseLatch;
+        }
+
+        @Override
+        public void onWritePossible() throws IOException {
+            if (written == 0) {
+                // Write until the buffer is full and then close the stream
+                while (ctx.getResponse().getOutputStream().isReady()) {
+                    ctx.getResponse().getOutputStream().write(DATA, written, CHUNK_SIZE);
+                    written += CHUNK_SIZE;
+                }
+                beforeCloseLatch.countDown();
+                ctx.getResponse().getOutputStream().close();
+                afterCloseLatch.countDown();
+            } else {
+                ctx.complete();
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            log.info("WriteListener.onError");
+            throwable.printStackTrace();
+        }
+    }
 }
\ No newline at end of file
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index ed849ea3e8..bb3e009784 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -144,10 +144,17 @@
         servlets. (markt)
       </fix>
       <fix>
-        Implement clarification from Jakarta Servlet project that Servlets
-        mapped to the context root should be mapped for requests to the
+        Implement the clarification from the Jakarta Servlet project that
+        Servlets mapped to the context root should be mapped for requests to the
         context root with or without the trailing <code>/</code>. (markt)
       </fix>
+      <fix>
+        Implement the clarification from the Jakarta Servlet project that
+        calling <code>ServletOutputStream.close()</code> on a stream in
+        non-blocking mode returns immediately with the stream effectively closed
+        and any data remaining to be written is written in the background by the
+        container. (markt)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Coyote">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org