You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2019/12/10 09:52:36 UTC

[tomcat] branch master updated: Fix async timeouts with HTTP/2

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/master by this push:
     new 89d4960  Fix async timeouts with HTTP/2
89d4960 is described below

commit 89d4960faf6106f8e4895b1b0e02c07176513dfc
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Tue Dec 10 09:31:29 2019 +0000

    Fix async timeouts with HTTP/2
---
 java/org/apache/coyote/AbstractProtocol.java       |  26 ++--
 java/org/apache/coyote/LocalStrings.properties     |   3 +
 .../coyote/http11/AbstractHttp11Protocol.java      |   5 +
 java/org/apache/coyote/http2/Http2Protocol.java    |  11 ++
 java/org/apache/coyote/http2/StreamProcessor.java  |   5 +-
 test/org/apache/coyote/http2/TestAsyncTimeout.java | 152 ++++++++++++++++-----
 webapps/docs/changelog.xml                         |   4 +
 7 files changed, 162 insertions(+), 44 deletions(-)

diff --git a/java/org/apache/coyote/AbstractProtocol.java b/java/org/apache/coyote/AbstractProtocol.java
index b32b546..b41ad32 100644
--- a/java/org/apache/coyote/AbstractProtocol.java
+++ b/java/org/apache/coyote/AbstractProtocol.java
@@ -391,11 +391,17 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
 
 
     public void addWaitingProcessor(Processor processor) {
+        if (getLog().isDebugEnabled()) {
+            getLog().debug(sm.getString("abstractProcotol.waitingProcerssor.add", processor));
+        }
         waitingProcessors.add(processor);
     }
 
 
     public void removeWaitingProcessor(Processor processor) {
+        if (getLog().isDebugEnabled()) {
+            getLog().debug(sm.getString("abstractProcotol.waitingProcerssor.remove", processor));
+        }
         waitingProcessors.remove(processor);
     }
 
@@ -804,11 +810,12 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                     // OpenSSL typically returns null whereas JSSE typically
                     // returns "" when no protocol is negotiated
                     if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {
-                        UpgradeProtocol upgradeProtocol =
-                                getProtocol().getNegotiatedProtocol(negotiatedProtocol);
+                        UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol);
                         if (upgradeProtocol != null) {
-                            processor = upgradeProtocol.getProcessor(
-                                    wrapper, getProtocol().getAdapter());
+                            processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
+                            if (getLog().isDebugEnabled()) {
+                                getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
+                            }
                         } else if (negotiatedProtocol.equals("http/1.1")) {
                             // Explicitly negotiated the default protocol.
                             // Obtain a processor below.
@@ -821,9 +828,8 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                             // replace the code below with the commented out
                             // block.
                             if (getLog().isDebugEnabled()) {
-                                getLog().debug(sm.getString(
-                                    "abstractConnectionHandler.negotiatedProcessor.fail",
-                                    negotiatedProtocol));
+                                getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",
+                                        negotiatedProtocol));
                             }
                             return SocketState.CLOSED;
                             /*
@@ -840,13 +846,15 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                 if (processor == null) {
                     processor = recycledProcessors.pop();
                     if (getLog().isDebugEnabled()) {
-                        getLog().debug(sm.getString("abstractConnectionHandler.processorPop",
-                                processor));
+                        getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor));
                     }
                 }
                 if (processor == null) {
                     processor = getProtocol().createProcessor();
                     register(processor);
+                    if (getLog().isDebugEnabled()) {
+                        getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
+                    }
                 }
 
                 processor.setSslSupport(
diff --git a/java/org/apache/coyote/LocalStrings.properties b/java/org/apache/coyote/LocalStrings.properties
index 4dddf71..b20d056 100644
--- a/java/org/apache/coyote/LocalStrings.properties
+++ b/java/org/apache/coyote/LocalStrings.properties
@@ -19,6 +19,7 @@ abstractConnectionHandler.ioexception.debug=IOExceptions are normal, ignored
 abstractConnectionHandler.negotiatedProcessor.fail=Failed to create Processor for negotiated protocol [{0}]
 abstractConnectionHandler.oome=Failed to complete processing of a request
 abstractConnectionHandler.process=Processing socket [{0}] with status [{1}]
+abstractConnectionHandler.processorCreate=Created new processor [{0}]
 abstractConnectionHandler.processorPop=Popped processor [{0}] from cache
 abstractConnectionHandler.protocolexception.debug=ProtocolExceptions are normal, ignored
 abstractConnectionHandler.socketexception.debug=SocketExceptions are normal, ignored
@@ -35,6 +36,8 @@ abstractProcessor.socket.ssl=Exception getting SSL attributes
 abstractProtocol.mbeanDeregistrationFailed=Failed to deregister MBean named [{0}] from MBean server [{1}]
 abstractProtocol.processorRegisterError=Error registering request processor
 abstractProtocol.processorUnregisterError=Error unregistering request processor
+abstractProcotol.waitingProcerssor.add=Added processor [{0}] to waiting processors
+abstractProcotol.waitingProcerssor.remove=Removed processor [{0}] from waiting processors
 
 abstractProtocolHandler.asyncTimeoutError=Error processing async timeouts
 abstractProtocolHandler.destroy=Destroying ProtocolHandler [{0}]
diff --git a/java/org/apache/coyote/http11/AbstractHttp11Protocol.java b/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
index ab2a482..e8a5b91 100644
--- a/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
+++ b/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
@@ -39,6 +39,7 @@ import org.apache.coyote.UpgradeToken;
 import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
 import org.apache.coyote.http11.upgrade.UpgradeProcessorExternal;
 import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal;
+import org.apache.coyote.http2.Http2Protocol;
 import org.apache.tomcat.util.buf.StringUtils;
 import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.SSLHostConfig;
@@ -498,6 +499,10 @@ public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
                 }
             }
         }
+
+        if (upgradeProtocol instanceof Http2Protocol) {
+            ((Http2Protocol) upgradeProtocol).setHttp11Protocol(this);
+        }
     }
     @Override
     public UpgradeProtocol getNegotiatedProtocol(String negotiatedName) {
diff --git a/java/org/apache/coyote/http2/Http2Protocol.java b/java/org/apache/coyote/http2/Http2Protocol.java
index f11f2d1..f8ee7ec 100644
--- a/java/org/apache/coyote/http2/Http2Protocol.java
+++ b/java/org/apache/coyote/http2/Http2Protocol.java
@@ -34,6 +34,7 @@ import org.apache.coyote.Request;
 import org.apache.coyote.Response;
 import org.apache.coyote.UpgradeProtocol;
 import org.apache.coyote.UpgradeToken;
+import org.apache.coyote.http11.AbstractHttp11Protocol;
 import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
 import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal;
 import org.apache.tomcat.util.buf.StringUtils;
@@ -91,6 +92,8 @@ public class Http2Protocol implements UpgradeProtocol {
     private boolean useSendfile = true;
     // Compression
     private final CompressionConfig compressionConfig = new CompressionConfig();
+    // Reference to HTTP/1.1 protocol that this instance is configured under
+    private AbstractHttp11Protocol<?> http11Protocol = null;
 
     @Override
     public String getHttpUpgradeName(boolean isSSLEnabled) {
@@ -418,4 +421,12 @@ public class Http2Protocol implements UpgradeProtocol {
     public boolean useCompression(Request request, Response response) {
         return compressionConfig.useCompression(request, response);
     }
+
+
+    public AbstractHttp11Protocol<?> getHttp11Protocol() {
+        return this.http11Protocol;
+    }
+    public void setHttp11Protocol(AbstractHttp11Protocol<?> http11Protocol) {
+        this.http11Protocol = http11Protocol;
+    }
 }
diff --git a/java/org/apache/coyote/http2/StreamProcessor.java b/java/org/apache/coyote/http2/StreamProcessor.java
index c550f19..99e2d78 100644
--- a/java/org/apache/coyote/http2/StreamProcessor.java
+++ b/java/org/apache/coyote/http2/StreamProcessor.java
@@ -71,7 +71,10 @@ class StreamProcessor extends AbstractProcessor {
                 try {
                     state = process(socketWrapper, event);
 
-                    if (state == SocketState.CLOSED) {
+                    if (state == SocketState.LONG) {
+                        handler.getProtocol().getHttp11Protocol().addWaitingProcessor(this);
+                    } else if (state == SocketState.CLOSED) {
+                        handler.getProtocol().getHttp11Protocol().removeWaitingProcessor(this);
                         if (!getErrorState().isConnectionIoAllowed()) {
                             ConnectionException ce = new ConnectionException(sm.getString(
                                     "streamProcessor.error.connection", stream.getConnectionId(),
diff --git a/test/org/apache/coyote/http2/TestAsyncTimeout.java b/test/org/apache/coyote/http2/TestAsyncTimeout.java
index 70a5348..1e97490 100644
--- a/test/org/apache/coyote/http2/TestAsyncTimeout.java
+++ b/test/org/apache/coyote/http2/TestAsyncTimeout.java
@@ -19,6 +19,8 @@ package org.apache.coyote.http2;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.servlet.AsyncContext;
@@ -29,7 +31,6 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.catalina.Context;
@@ -38,7 +39,6 @@ import org.apache.catalina.startup.Tomcat;
 
 public class TestAsyncTimeout extends Http2TestBase {
 
-    @Ignore // Until this HTTP/2 + async timeouts is fixed
     @Test
     public void testTimeout() throws Exception {
         enableHttp2();
@@ -46,9 +46,16 @@ public class TestAsyncTimeout extends Http2TestBase {
         Tomcat tomcat = getTomcatInstance();
 
         Context ctxt = tomcat.addContext("", null);
+        // This is the target of the HTTP/2 upgrade request
         Tomcat.addServlet(ctxt, "simple", new SimpleServlet());
         ctxt.addServletMappingDecoded("/simple", "simple");
-        Wrapper w = Tomcat.addServlet(ctxt, "async", new AsyncTimeoutServlet());
+
+        // This is the servlet that does that actual test
+        // This latch is used to signal that that async thread used by the test
+        // has ended. It isn;t essential to the test but it allows the test to
+        // complete without Tmcat logging an error about a still running thread.
+        CountDownLatch latch = new CountDownLatch(1);
+        Wrapper w = Tomcat.addServlet(ctxt, "async", new AsyncTimeoutServlet(latch));
         w.setAsyncSupported(true);
         ctxt.addServletMappingDecoded("/async", "async");
         tomcat.start();
@@ -58,9 +65,11 @@ public class TestAsyncTimeout extends Http2TestBase {
         sendClientPreface();
         validateHttp2InitialResponse();
 
-        // Reset connection window size after intial response
+        // Reset connection window size after initial response
         sendWindowUpdate(0, SimpleServlet.CONTENT_LENGTH);
 
+        // Include the response body in the trace so we can check for the PASS /
+        // FAIL text.
         output.setTraceBody(true);
 
         // Send request
@@ -74,9 +83,11 @@ public class TestAsyncTimeout extends Http2TestBase {
         // Body
         parser.readFrame(true);
 
-        // Check that the right number of bytes were received
+        // Check that the expected text was received
         String trace = output.getTrace();
         Assert.assertTrue(trace, trace.contains("PASS"));
+
+        latch.await(10, TimeUnit.SECONDS);
     }
 
 
@@ -84,59 +95,132 @@ public class TestAsyncTimeout extends Http2TestBase {
 
         private static final long serialVersionUID = 1L;
 
+        private final CountDownLatch latch;
+
+        public AsyncTimeoutServlet(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
         @Override
         protected void doGet(HttpServletRequest request, HttpServletResponse response)
                 throws IOException {
 
+            // The idea of this test is that the timeout kicks in after 2
+            // seconds and stops the async thread early rather than letting it
+            // complete the full 5 seconds of processing.
             final AsyncContext asyncContext = request.startAsync();
 
             response.setStatus(HttpServletResponse.SC_OK);
             response.setContentType("text/plain");
             response.setCharacterEncoding("UTF-8");
 
-            asyncContext.addListener(new AsyncListener() {
+            // Only want to call complete() once (else we get stack traces in
+            // the logs so use this to track when complete() is called).
+            AtomicBoolean completeCalled = new AtomicBoolean(false);
+            Ticker ticker = new Ticker(asyncContext, completeCalled);
+            TimeoutListener listener = new TimeoutListener(latch, ticker, completeCalled);
+            asyncContext.addListener(listener);
+            asyncContext.setTimeout(2000);
+            ticker.start();
+        }
+    }
 
-                AtomicBoolean ended = new AtomicBoolean(false);
 
-                @Override
-                public void onTimeout(AsyncEvent event) throws IOException {
-                    if (ended.compareAndSet(false, true)) {
-                        PrintWriter pw = event.getAsyncContext().getResponse().getWriter();
-                        pw.write("PASS");
-                        pw.flush();
-                        event.getAsyncContext().complete();
-                    }
-                }
+    private static class Ticker extends Thread {
 
-                @Override
-                public void onStartAsync(AsyncEvent event) throws IOException {
-                    // NO-OP
-                }
+        private final AsyncContext asyncContext;
+        private final AtomicBoolean completeCalled;
+        private volatile boolean running = true;
+
+        public Ticker(AsyncContext asyncContext, AtomicBoolean completeCalled) {
+            this.asyncContext = asyncContext;
+            this.completeCalled = completeCalled;
+        }
+
+        public void end() {
+            running = false;
+        }
 
-                @Override
-                public void onError(AsyncEvent event) throws IOException {
-                    // NO-OP
+        @Override
+        public void run() {
+            try {
+                PrintWriter pw = asyncContext.getResponse().getWriter();
+                int counter = 0;
+
+                // If the test works running will be set too false before
+                // counter reaches 50.
+                while (running && counter < 50) {
+                    Thread.sleep(100);
+                    counter++;
+                    pw.print("Tick " + counter);
                 }
+                // Need to call complete() here if the test fails but complete()
+                // should have been called by the listener. Use the flag to make
+                // sure we only call complete once.
+                if (completeCalled.compareAndSet(false, true)) {
+                    asyncContext.complete();
+                }
+            } catch (IOException | InterruptedException e) {
+                // Ignore
+            }
+        }
+    }
+
+
+    private static class TimeoutListener implements AsyncListener {
 
-                @Override
-                public void onComplete(AsyncEvent event) throws IOException {
-                    if (ended.compareAndSet(false, true)) {
-                        PrintWriter pw = event.getAsyncContext().getResponse().getWriter();
-                        pw.write("FAIL");
-                        pw.flush();
-                    }
+        private final AtomicBoolean ended = new AtomicBoolean(false);
+        private final CountDownLatch latch;
+        private final Ticker ticker;
+        private final AtomicBoolean completeCalled;
+
+        public TimeoutListener(CountDownLatch latch, Ticker ticker, AtomicBoolean completeCalled) {
+            this.latch = latch;
+            this.ticker = ticker;
+            this.completeCalled = completeCalled;
+        }
+
+        @Override
+        public void onTimeout(AsyncEvent event) throws IOException {
+            ticker.end();
+            if (ended.compareAndSet(false, true)) {
+                PrintWriter pw = event.getAsyncContext().getResponse().getWriter();
+                pw.write("PASS");
+                pw.flush();
+                // If the timeout fires we should always need to call complete()
+                // here but use the flag to be safe.
+                if (completeCalled.compareAndSet(false, true)) {
+                    event.getAsyncContext().complete();
                 }
-            });
+            }
+        }
 
+        @Override
+        public void onStartAsync(AsyncEvent event) throws IOException {
+            // NO-OP
+        }
 
-            asyncContext.setTimeout(2000);
+        @Override
+        public void onError(AsyncEvent event) throws IOException {
+            // NO-OP
+        }
 
+        @Override
+        public void onComplete(AsyncEvent event) throws IOException {
+            if (ended.compareAndSet(false, true)) {
+                PrintWriter pw = event.getAsyncContext().getResponse().getWriter();
+                pw.write("FAIL");
+                pw.flush();
+            }
             try {
-                Thread.sleep(4000);
+                // Wait for the async thread to end before we signal that the
+                // test is complete. This avoids logging an exception about a
+                // still running thread when the unit test shuts down.
+                ticker.join();
+                latch.countDown();
             } catch (InterruptedException e) {
                 // Ignore
             }
-            asyncContext.complete();
         }
     }
 }
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 5e48ebf..bf24670 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -50,6 +50,10 @@
       <update>
         Simplify NIO blocking read and write. (remm)
       </update>
+      <fix>
+        Ensure that Servlet Asynchronous processing timeouts fire when requests
+        are made using HTTP/2. (markt)
+      </fix>
     </changelog>
   </subsection>
 </section>


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: [tomcat] branch master updated: Fix async timeouts with HTTP/2

Posted by Rémy Maucherat <re...@apache.org>.
On Tue, Dec 10, 2019 at 10:52 AM <ma...@apache.org> wrote:

> This is an automated email from the ASF dual-hosted git repository.
>
> markt pushed a commit to branch master
> in repository https://gitbox.apache.org/repos/asf/tomcat.git
>
>
> The following commit(s) were added to refs/heads/master by this push:
>      new 89d4960  Fix async timeouts with HTTP/2
> 89d4960 is described below
>
> commit 89d4960faf6106f8e4895b1b0e02c07176513dfc
> Author: Mark Thomas <ma...@apache.org>
> AuthorDate: Tue Dec 10 09:31:29 2019 +0000
>
>     Fix async timeouts with HTTP/2
>

Ok, good solution, the stream processor is the processor that must get the
timeout.

Rémy