You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2023/02/01 18:21:06 UTC
[tomcat] branch main updated: Align async close implementation with Servlet 6.1 clarification
This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push:
new e1fb169adb Align async close implementation with Servlet 6.1 clarification
e1fb169adb is described below
commit e1fb169adb953e826fecb4b15a178636aa7c3a13
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Tue Dec 13 11:22:51 2022 +0000
Align async close implementation with Servlet 6.1 clarification
Includes a test case
---
java/org/apache/coyote/AbstractProcessor.java | 8 +-
java/org/apache/coyote/AbstractProcessorLight.java | 2 +-
java/org/apache/coyote/AsyncStateMachine.java | 9 +-
.../apache/coyote/http11/Http11OutputBuffer.java | 9 +-
.../catalina/nonblocking/TestNonBlockingAPI.java | 157 +++++++++++++++++++++
webapps/docs/changelog.xml | 11 +-
6 files changed, 182 insertions(+), 14 deletions(-)
diff --git a/java/org/apache/coyote/AbstractProcessor.java b/java/org/apache/coyote/AbstractProcessor.java
index 50743ffece..6d3be069c7 100644
--- a/java/org/apache/coyote/AbstractProcessor.java
+++ b/java/org/apache/coyote/AbstractProcessor.java
@@ -193,7 +193,7 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement
@Override
- public SocketState asyncPostProcess() {
+ public SocketState asyncPostProcess() throws IOException {
return asyncStateMachine.asyncPostProcess();
}
@@ -570,7 +570,11 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement
break;
}
case ASYNC_POST_PROCESS: {
- asyncStateMachine.asyncPostProcess();
+ try {
+ asyncStateMachine.asyncPostProcess();
+ } catch (IOException e) {
+ handleIOException(e);
+ }
break;
}
diff --git a/java/org/apache/coyote/AbstractProcessorLight.java b/java/org/apache/coyote/AbstractProcessorLight.java
index 709530dfc8..9ba345fa66 100644
--- a/java/org/apache/coyote/AbstractProcessorLight.java
+++ b/java/org/apache/coyote/AbstractProcessorLight.java
@@ -192,7 +192,7 @@ public abstract class AbstractProcessorLight implements Processor {
*/
protected abstract SocketState dispatch(SocketEvent status) throws IOException;
- protected abstract SocketState asyncPostProcess();
+ protected abstract SocketState asyncPostProcess() throws IOException;
protected abstract Log getLog();
}
diff --git a/java/org/apache/coyote/AsyncStateMachine.java b/java/org/apache/coyote/AsyncStateMachine.java
index b400788831..50b16316a9 100644
--- a/java/org/apache/coyote/AsyncStateMachine.java
+++ b/java/org/apache/coyote/AsyncStateMachine.java
@@ -16,6 +16,7 @@
*/
package org.apache.coyote;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.juli.logging.Log;
@@ -264,7 +265,7 @@ class AsyncStateMachine {
* current state. For example, as per SRV.2.3.3.3 can now process calls to
* complete() or dispatch().
*/
- synchronized SocketState asyncPostProcess() {
+ synchronized SocketState asyncPostProcess() throws IOException {
if (state == AsyncState.COMPLETE_PENDING) {
clearNonBlockingListeners();
updateState(AsyncState.COMPLETING);
@@ -277,6 +278,9 @@ class AsyncStateMachine {
updateState(AsyncState.STARTED);
return SocketState.LONG;
} else if (state == AsyncState.MUST_COMPLETE || state == AsyncState.COMPLETING) {
+ if (processor.getErrorState().isIoAllowed() && processor.flushBufferedWrite()) {
+ return SocketState.LONG;
+ }
asyncCtxt.fireOnComplete();
updateState(AsyncState.DISPATCHED);
asyncCtxt.decrementInProgressAsyncCount();
@@ -285,6 +289,9 @@ class AsyncStateMachine {
updateState(AsyncState.DISPATCHING);
return SocketState.ASYNC_END;
} else if (state == AsyncState.DISPATCHING) {
+ if (processor.getErrorState().isIoAllowed() && processor.flushBufferedWrite()) {
+ return SocketState.LONG;
+ }
updateState(AsyncState.DISPATCHED);
asyncCtxt.decrementInProgressAsyncCount();
return SocketState.ASYNC_END;
diff --git a/java/org/apache/coyote/http11/Http11OutputBuffer.java b/java/org/apache/coyote/http11/Http11OutputBuffer.java
index 570b90c0d7..e7206a50bd 100644
--- a/java/org/apache/coyote/http11/Http11OutputBuffer.java
+++ b/java/org/apache/coyote/http11/Http11OutputBuffer.java
@@ -565,14 +565,7 @@ public class Http11OutputBuffer implements HttpOutputBuffer {
@Override
public void end() throws IOException {
- /*
- * TODO
- * As of Servlet 6.1, this flush is (currently) meant to be
- * non-blocking if the output stream is in non-blocking mode. That
- * requirement creates various complications I want to discuss with
- * the EG before I try implementing it.
- */
- socketWrapper.flush(true);
+ socketWrapper.flush(response.getWriteListener() == null);
}
@Override
diff --git a/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java b/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
index 0609eb05b6..018742394c 100644
--- a/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
+++ b/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
@@ -1697,4 +1697,161 @@ public class TestNonBlockingAPI extends TomcatBaseTest {
// NO-OP
}
}
+
+
+ @Test
+ public void testNonBlockingWriteWithClose() throws Exception {
+ AtomicBoolean asyncContextIsComplete = new AtomicBoolean(false);
+ AtomicBoolean asyncContextIsError = new AtomicBoolean(false);
+
+ CountDownLatch beforeCloseLatch = new CountDownLatch(1);
+ CountDownLatch afterCloseLatch = new CountDownLatch(1);
+
+ AtomicInteger written = new AtomicInteger(-1);
+
+ Tomcat tomcat = getTomcatInstance();
+ // Note: Low values of socket.txBufSize can trigger very poor
+ // performance.
+ Assert.assertTrue(tomcat.getConnector().setProperty("socket.txBufSize", "524228"));
+
+ // No file system docBase required
+ Context ctx = tomcat.addContext("", null);
+
+ TesterAccessLogValve alv = new TesterAccessLogValve();
+ ctx.getPipeline().addValve(alv);
+
+ NBWriteWithCloseServlet servlet = new NBWriteWithCloseServlet(
+ asyncContextIsComplete, asyncContextIsError, beforeCloseLatch, afterCloseLatch, written);
+ String servletName = NBWriteWithCloseServlet.class.getName();
+ Tomcat.addServlet(ctx, servletName, servlet);
+ ctx.addServletMappingDecoded("/", servletName);
+
+ tomcat.start();
+
+ SocketFactory factory = SocketFactory.getDefault();
+ Socket s = factory.createSocket("localhost", getPort());
+
+ OutputStream os = s.getOutputStream();
+ os.write(("GET / HTTP/1.1\r\n" +
+ "Host: localhost:" + getPort() + "\r\n" +
+ "Connection: close\r\n" +
+ "\r\n").getBytes(StandardCharsets.ISO_8859_1));
+ os.flush();
+
+ // Wait for Servlet to fill write buffer
+ beforeCloseLatch.await();
+ // Close should return immediately
+ long start = System.nanoTime();
+ afterCloseLatch.await();
+ long duration = System.nanoTime() - start;
+
+ Assert.assertTrue("Close took [" + duration + "] ns", duration < 1_000_000_000);
+
+ // Read the body
+ InputStream is = s.getInputStream();
+ int read = 0;
+ byte[] buffer = new byte[8192];
+ do {
+ read = is.read(buffer);
+ } while (read != -1);
+
+ os.close();
+ is.close();
+ s.close();
+
+ Assert.assertTrue(asyncContextIsComplete.get());
+ }
+
+
+ @WebServlet(asyncSupported = true)
+ public static class NBWriteWithCloseServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+ private final AtomicBoolean asyncContextIsComplete;
+ private final AtomicBoolean asyncContextIsError;
+ private final CountDownLatch beforeCloseLatch;
+ private final CountDownLatch afterCloseLatch;
+ private final AtomicInteger written;
+
+ public NBWriteWithCloseServlet(AtomicBoolean asyncContextIsComplete, AtomicBoolean asyncContextIsError,
+ CountDownLatch beforeCloseLatch, CountDownLatch afterCloseLatch, AtomicInteger written) {
+ this.asyncContextIsComplete = asyncContextIsComplete;
+ this.asyncContextIsError = asyncContextIsError;
+ this.beforeCloseLatch = beforeCloseLatch;
+ this.afterCloseLatch = afterCloseLatch;
+ this.written = written;
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ AsyncContext actx = req.startAsync();
+ actx.setTimeout(Long.MAX_VALUE);
+ actx.addListener(new AsyncListener() {
+
+ @Override
+ public void onTimeout(AsyncEvent event) throws IOException {
+ log.info("onTimeout");
+ }
+
+ @Override
+ public void onStartAsync(AsyncEvent event) throws IOException {
+ log.info("onStartAsync");
+ }
+
+ @Override
+ public void onError(AsyncEvent event) throws IOException {
+ log.info("AsyncListener.onError");
+ asyncContextIsError.set(true);
+ }
+
+ @Override
+ public void onComplete(AsyncEvent event) throws IOException {
+ log.info("onComplete");
+ asyncContextIsComplete.set(true);
+ }
+ });
+
+ // Write until buffer is full
+ ServletOutputStream out = resp.getOutputStream();
+ TestWriteListener03 writeListener = new TestWriteListener03(actx, beforeCloseLatch, afterCloseLatch);
+ out.setWriteListener(writeListener);
+
+ written.set(writeListener.written);
+ }
+ }
+
+
+ private static class TestWriteListener03 implements WriteListener {
+ private final AsyncContext ctx;
+ private final CountDownLatch beforeCloseLatch;
+ private final CountDownLatch afterCloseLatch;
+ int written = 0;
+
+ public TestWriteListener03(AsyncContext ctx, CountDownLatch beforeCloseLatch, CountDownLatch afterCloseLatch) {
+ this.ctx = ctx;
+ this.beforeCloseLatch = beforeCloseLatch;
+ this.afterCloseLatch = afterCloseLatch;
+ }
+
+ @Override
+ public void onWritePossible() throws IOException {
+ if (written == 0) {
+ // Write until the buffer is full and then close the stream
+ while (ctx.getResponse().getOutputStream().isReady()) {
+ ctx.getResponse().getOutputStream().write(DATA, written, CHUNK_SIZE);
+ written += CHUNK_SIZE;
+ }
+ beforeCloseLatch.countDown();
+ ctx.getResponse().getOutputStream().close();
+ afterCloseLatch.countDown();
+ } else {
+ ctx.complete();
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.info("WriteListener.onError");
+ throwable.printStackTrace();
+ }
+ }
}
\ No newline at end of file
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index ed849ea3e8..bb3e009784 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -144,10 +144,17 @@
servlets. (markt)
</fix>
<fix>
- Implement clarification from Jakarta Servlet project that Servlets
- mapped to the context root should be mapped for requests to the
+ Implement the clarification from the Jakarta Servlet project that
+ Servlets mapped to the context root should be mapped for requests to the
context root with or without the trailing <code>/</code>. (markt)
</fix>
+ <fix>
+ Implement the clarification from the Jakarta Servlet project that
+ calling <code>ServletOutputStream.close()</code> on a stream in
+ non-blocking mode returns immediately with the stream effectively closed
+ and any data remaining to be written is written in the background by the
+ container. (markt)
+ </fix>
</changelog>
</subsection>
<subsection name="Coyote">
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org