You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2019/12/10 10:12:36 UTC

[tomcat] branch 8.5.x updated: Fix async timeouts with HTTP/2

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/8.5.x by this push:
     new 8457c90  Fix async timeouts with HTTP/2
8457c90 is described below

commit 8457c908b9a50e6b70f0cbe779b5e2207a2fdcb9
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Mon Dec 9 17:34:10 2019 +0000

    Fix async timeouts with HTTP/2
---
 java/org/apache/coyote/AbstractProtocol.java       |  26 ++-
 java/org/apache/coyote/LocalStrings.properties     |   3 +
 .../coyote/http11/AbstractHttp11Protocol.java      |   5 +
 java/org/apache/coyote/http2/Http2Protocol.java    |  11 +
 java/org/apache/coyote/http2/StreamProcessor.java  |   5 +-
 test/org/apache/coyote/http2/TestAsyncTimeout.java | 226 +++++++++++++++++++++
 webapps/docs/changelog.xml                         |   8 +
 7 files changed, 274 insertions(+), 10 deletions(-)

diff --git a/java/org/apache/coyote/AbstractProtocol.java b/java/org/apache/coyote/AbstractProtocol.java
index 684582f..f021791 100644
--- a/java/org/apache/coyote/AbstractProtocol.java
+++ b/java/org/apache/coyote/AbstractProtocol.java
@@ -398,11 +398,17 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
 
 
     public void addWaitingProcessor(Processor processor) {
+        if (getLog().isDebugEnabled()) {
+            getLog().debug(sm.getString("abstractProcotol.waitingProcerssor.add", processor));
+        }
         waitingProcessors.add(processor);
     }
 
 
     public void removeWaitingProcessor(Processor processor) {
+        if (getLog().isDebugEnabled()) {
+            getLog().debug(sm.getString("abstractProcotol.waitingProcerssor.remove", processor));
+        }
         waitingProcessors.remove(processor);
     }
 
@@ -754,11 +760,12 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                     // OpenSSL typically returns null whereas JSSE typically
                     // returns "" when no protocol is negotiated
                     if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {
-                        UpgradeProtocol upgradeProtocol =
-                                getProtocol().getNegotiatedProtocol(negotiatedProtocol);
+                        UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol);
                         if (upgradeProtocol != null) {
-                            processor = upgradeProtocol.getProcessor(
-                                    wrapper, getProtocol().getAdapter());
+                            processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
+                            if (getLog().isDebugEnabled()) {
+                                getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
+                            }
                         } else if (negotiatedProtocol.equals("http/1.1")) {
                             // Explicitly negotiated the default protocol.
                             // Obtain a processor below.
@@ -771,9 +778,8 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                             // replace the code below with the commented out
                             // block.
                             if (getLog().isDebugEnabled()) {
-                                getLog().debug(sm.getString(
-                                    "abstractConnectionHandler.negotiatedProcessor.fail",
-                                    negotiatedProtocol));
+                                getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",
+                                        negotiatedProtocol));
                             }
                             return SocketState.CLOSED;
                             /*
@@ -790,13 +796,15 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
                 if (processor == null) {
                     processor = recycledProcessors.pop();
                     if (getLog().isDebugEnabled()) {
-                        getLog().debug(sm.getString("abstractConnectionHandler.processorPop",
-                                processor));
+                        getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor));
                     }
                 }
                 if (processor == null) {
                     processor = getProtocol().createProcessor();
                     register(processor);
+                    if (getLog().isDebugEnabled()) {
+                        getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
+                    }
                 }
 
                 processor.setSslSupport(
diff --git a/java/org/apache/coyote/LocalStrings.properties b/java/org/apache/coyote/LocalStrings.properties
index 33fc21f..5262a86 100644
--- a/java/org/apache/coyote/LocalStrings.properties
+++ b/java/org/apache/coyote/LocalStrings.properties
@@ -19,6 +19,7 @@ abstractConnectionHandler.ioexception.debug=IOExceptions are normal, ignored
 abstractConnectionHandler.negotiatedProcessor.fail=Failed to create Processor for negotiated protocol [{0}]
 abstractConnectionHandler.oome=Failed to complete processing of a request
 abstractConnectionHandler.process=Processing socket [{0}] with status [{1}]
+abstractConnectionHandler.processorCreate=Created new processor [{0}]
 abstractConnectionHandler.processorPop=Popped processor [{0}] from cache
 abstractConnectionHandler.protocolexception.debug=ProtocolExceptions are normal, ignored
 abstractConnectionHandler.socketexception.debug=SocketExceptions are normal, ignored
@@ -34,6 +35,8 @@ abstractProcessor.socket.ssl=Exception getting SSL attributes
 abstractProtocol.mbeanDeregistrationFailed=Failed to deregister MBean named [{0}] from MBean server [{1}]
 abstractProtocol.processorRegisterError=Error registering request processor
 abstractProtocol.processorUnregisterError=Error unregistering request processor
+abstractProcotol.waitingProcerssor.add=Added processor [{0}] to waiting processors
+abstractProcotol.waitingProcerssor.remove=Removed processor [{0}] from waiting processors
 
 abstractProtocolHandler.destroy=Destroying ProtocolHandler [{0}]
 abstractProtocolHandler.destroyError=Failed to destroy end point associated with ProtocolHandler [{0}]
diff --git a/java/org/apache/coyote/http11/AbstractHttp11Protocol.java b/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
index 0833e2e..599b0eb 100644
--- a/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
+++ b/java/org/apache/coyote/http11/AbstractHttp11Protocol.java
@@ -38,6 +38,7 @@ import org.apache.coyote.UpgradeToken;
 import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
 import org.apache.coyote.http11.upgrade.UpgradeProcessorExternal;
 import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal;
+import org.apache.coyote.http2.Http2Protocol;
 import org.apache.tomcat.util.buf.StringUtils;
 import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.SSLHostConfig;
@@ -450,6 +451,10 @@ public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
                 }
             }
         }
+
+        if (upgradeProtocol instanceof Http2Protocol) {
+            ((Http2Protocol) upgradeProtocol).setHttp11Protocol(this);
+        }
     }
     @Override
     public UpgradeProtocol getNegotiatedProtocol(String negotiatedName) {
diff --git a/java/org/apache/coyote/http2/Http2Protocol.java b/java/org/apache/coyote/http2/Http2Protocol.java
index 300e17a..a2ac7c2 100644
--- a/java/org/apache/coyote/http2/Http2Protocol.java
+++ b/java/org/apache/coyote/http2/Http2Protocol.java
@@ -34,6 +34,7 @@ import org.apache.coyote.Request;
 import org.apache.coyote.Response;
 import org.apache.coyote.UpgradeProtocol;
 import org.apache.coyote.UpgradeToken;
+import org.apache.coyote.http11.AbstractHttp11Protocol;
 import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
 import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal;
 import org.apache.tomcat.util.buf.StringUtils;
@@ -90,6 +91,8 @@ public class Http2Protocol implements UpgradeProtocol {
     private boolean initiatePingDisabled = false;
     // Compression
     private final CompressionConfig compressionConfig = new CompressionConfig();
+    // Reference to HTTP/1.1 protocol that this instance is configured under
+    private AbstractHttp11Protocol<?> http11Protocol = null;
 
     @Override
     public String getHttpUpgradeName(boolean isSSLEnabled) {
@@ -409,4 +412,12 @@ public class Http2Protocol implements UpgradeProtocol {
     public boolean useCompression(Request request, Response response) {
         return compressionConfig.useCompression(request, response);
     }
+
+
+    public AbstractHttp11Protocol<?> getHttp11Protocol() {
+        return this.http11Protocol;
+    }
+    public void setHttp11Protocol(AbstractHttp11Protocol<?> http11Protocol) {
+        this.http11Protocol = http11Protocol;
+    }
 }
diff --git a/java/org/apache/coyote/http2/StreamProcessor.java b/java/org/apache/coyote/http2/StreamProcessor.java
index c2aaea7..f3c380d 100644
--- a/java/org/apache/coyote/http2/StreamProcessor.java
+++ b/java/org/apache/coyote/http2/StreamProcessor.java
@@ -68,7 +68,10 @@ class StreamProcessor extends AbstractProcessor {
                 try {
                     state = process(socketWrapper, event);
 
-                    if (state == SocketState.CLOSED) {
+                    if (state == SocketState.LONG) {
+                        handler.getProtocol().getHttp11Protocol().addWaitingProcessor(this);
+                    } else if (state == SocketState.CLOSED) {
+                        handler.getProtocol().getHttp11Protocol().removeWaitingProcessor(this);
                         if (!getErrorState().isConnectionIoAllowed()) {
                             ConnectionException ce = new ConnectionException(sm.getString(
                                     "streamProcessor.error.connection", stream.getConnectionId(),
diff --git a/test/org/apache/coyote/http2/TestAsyncTimeout.java b/test/org/apache/coyote/http2/TestAsyncTimeout.java
new file mode 100644
index 0000000..1e97490
--- /dev/null
+++ b/test/org/apache/coyote/http2/TestAsyncTimeout.java
@@ -0,0 +1,226 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http2;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.catalina.Context;
+import org.apache.catalina.Wrapper;
+import org.apache.catalina.startup.Tomcat;
+
+public class TestAsyncTimeout extends Http2TestBase {
+
+    @Test
+    public void testTimeout() throws Exception {
+        enableHttp2();
+
+        Tomcat tomcat = getTomcatInstance();
+
+        Context ctxt = tomcat.addContext("", null);
+        // This is the target of the HTTP/2 upgrade request
+        Tomcat.addServlet(ctxt, "simple", new SimpleServlet());
+        ctxt.addServletMappingDecoded("/simple", "simple");
+
+        // This is the servlet that does that actual test
+        // This latch is used to signal that that async thread used by the test
+        // has ended. It isn;t essential to the test but it allows the test to
+        // complete without Tmcat logging an error about a still running thread.
+        CountDownLatch latch = new CountDownLatch(1);
+        Wrapper w = Tomcat.addServlet(ctxt, "async", new AsyncTimeoutServlet(latch));
+        w.setAsyncSupported(true);
+        ctxt.addServletMappingDecoded("/async", "async");
+        tomcat.start();
+
+        openClientConnection();
+        doHttpUpgrade();
+        sendClientPreface();
+        validateHttp2InitialResponse();
+
+        // Reset connection window size after initial response
+        sendWindowUpdate(0, SimpleServlet.CONTENT_LENGTH);
+
+        // Include the response body in the trace so we can check for the PASS /
+        // FAIL text.
+        output.setTraceBody(true);
+
+        // Send request
+        byte[] frameHeader = new byte[9];
+        ByteBuffer headersPayload = ByteBuffer.allocate(128);
+        buildGetRequest(frameHeader, headersPayload, null, 3, "/async");
+        writeFrame(frameHeader, headersPayload);
+
+        // Headers
+        parser.readFrame(true);
+        // Body
+        parser.readFrame(true);
+
+        // Check that the expected text was received
+        String trace = output.getTrace();
+        Assert.assertTrue(trace, trace.contains("PASS"));
+
+        latch.await(10, TimeUnit.SECONDS);
+    }
+
+
+    public static class AsyncTimeoutServlet extends HttpServlet {
+
+        private static final long serialVersionUID = 1L;
+
+        private final CountDownLatch latch;
+
+        public AsyncTimeoutServlet(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        @Override
+        protected void doGet(HttpServletRequest request, HttpServletResponse response)
+                throws IOException {
+
+            // The idea of this test is that the timeout kicks in after 2
+            // seconds and stops the async thread early rather than letting it
+            // complete the full 5 seconds of processing.
+            final AsyncContext asyncContext = request.startAsync();
+
+            response.setStatus(HttpServletResponse.SC_OK);
+            response.setContentType("text/plain");
+            response.setCharacterEncoding("UTF-8");
+
+            // Only want to call complete() once (else we get stack traces in
+            // the logs so use this to track when complete() is called).
+            AtomicBoolean completeCalled = new AtomicBoolean(false);
+            Ticker ticker = new Ticker(asyncContext, completeCalled);
+            TimeoutListener listener = new TimeoutListener(latch, ticker, completeCalled);
+            asyncContext.addListener(listener);
+            asyncContext.setTimeout(2000);
+            ticker.start();
+        }
+    }
+
+
+    private static class Ticker extends Thread {
+
+        private final AsyncContext asyncContext;
+        private final AtomicBoolean completeCalled;
+        private volatile boolean running = true;
+
+        public Ticker(AsyncContext asyncContext, AtomicBoolean completeCalled) {
+            this.asyncContext = asyncContext;
+            this.completeCalled = completeCalled;
+        }
+
+        public void end() {
+            running = false;
+        }
+
+        @Override
+        public void run() {
+            try {
+                PrintWriter pw = asyncContext.getResponse().getWriter();
+                int counter = 0;
+
+                // If the test works running will be set too false before
+                // counter reaches 50.
+                while (running && counter < 50) {
+                    Thread.sleep(100);
+                    counter++;
+                    pw.print("Tick " + counter);
+                }
+                // Need to call complete() here if the test fails but complete()
+                // should have been called by the listener. Use the flag to make
+                // sure we only call complete once.
+                if (completeCalled.compareAndSet(false, true)) {
+                    asyncContext.complete();
+                }
+            } catch (IOException | InterruptedException e) {
+                // Ignore
+            }
+        }
+    }
+
+
+    private static class TimeoutListener implements AsyncListener {
+
+        private final AtomicBoolean ended = new AtomicBoolean(false);
+        private final CountDownLatch latch;
+        private final Ticker ticker;
+        private final AtomicBoolean completeCalled;
+
+        public TimeoutListener(CountDownLatch latch, Ticker ticker, AtomicBoolean completeCalled) {
+            this.latch = latch;
+            this.ticker = ticker;
+            this.completeCalled = completeCalled;
+        }
+
+        @Override
+        public void onTimeout(AsyncEvent event) throws IOException {
+            ticker.end();
+            if (ended.compareAndSet(false, true)) {
+                PrintWriter pw = event.getAsyncContext().getResponse().getWriter();
+                pw.write("PASS");
+                pw.flush();
+                // If the timeout fires we should always need to call complete()
+                // here but use the flag to be safe.
+                if (completeCalled.compareAndSet(false, true)) {
+                    event.getAsyncContext().complete();
+                }
+            }
+        }
+
+        @Override
+        public void onStartAsync(AsyncEvent event) throws IOException {
+            // NO-OP
+        }
+
+        @Override
+        public void onError(AsyncEvent event) throws IOException {
+            // NO-OP
+        }
+
+        @Override
+        public void onComplete(AsyncEvent event) throws IOException {
+            if (ended.compareAndSet(false, true)) {
+                PrintWriter pw = event.getAsyncContext().getResponse().getWriter();
+                pw.write("FAIL");
+                pw.flush();
+            }
+            try {
+                // Wait for the async thread to end before we signal that the
+                // test is complete. This avoids logging an exception about a
+                // still running thread when the unit test shuts down.
+                ticker.join();
+                latch.countDown();
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+        }
+    }
+}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 29a5ada..b4a7c6a 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -45,6 +45,14 @@
   issues do not "pop up" wrt. others).
 -->
 <section name="Tomcat 8.5.51 (markt)" rtext="in development">
+  <subsection name="Coyote">
+    <changelog>
+      <fix>
+        Ensure that Servlet Asynchronous processing timeouts fire when requests
+        are made using HTTP/2. (markt)
+      </fix>
+    </changelog>
+  </subsection>
 </section>
 <section name="Tomcat 8.5.50 (markt)">
   <subsection name="Catalina">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org