You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2020/11/02 17:15:13 UTC

[tomcat] branch 8.5.x updated: Additional fix for BZ 64848. Ensure Processor instances are cleaned up

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/8.5.x by this push:
     new b648088  Additional fix for BZ 64848. Ensure Processor instances are cleaned up
b648088 is described below

commit b648088ef9237bac54d53e6a70a2da6721531674
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Mon Nov 2 14:52:24 2020 +0000

    Additional fix for BZ 64848. Ensure Processor instances are cleaned up
---
 java/org/apache/coyote/AbstractProtocol.java       | 30 +++++--
 java/org/apache/tomcat/util/net/NioEndpoint.java   |  1 +
 .../tomcat/websocket/TestWebSocketFrameClient.java |  4 +-
 .../websocket/TestWebSocketFrameClientSSL.java     |  8 +-
 .../tomcat/websocket/TesterFirehoseServer.java     | 87 +++++++++++++++++---
 .../tomcat/websocket/server/TestSlowClient.java    | 92 ++++++++++++++++++++++
 webapps/docs/changelog.xml                         | 10 ++-
 7 files changed, 205 insertions(+), 27 deletions(-)

diff --git a/java/org/apache/coyote/AbstractProtocol.java b/java/org/apache/coyote/AbstractProtocol.java
index 247a5b0..a26f1ba 100644
--- a/java/org/apache/coyote/AbstractProtocol.java
+++ b/java/org/apache/coyote/AbstractProtocol.java
@@ -415,6 +415,15 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
     }
 
 
+    /*
+     * Primarily for debugging and testing. Could be exposed via JMX if
+     * considered useful.
+     */
+    public int getWaitingProcessorCount() {
+        return waitingProcessors.size();
+    }
+
+
     // ----------------------------------------------- Accessors for sub-classes
 
     protected AbstractEndpoint<S> getEndpoint() {
@@ -1010,12 +1019,21 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
         private void release(Processor processor) {
             if (processor != null) {
                 processor.recycle();
-                // After recycling, only instances of UpgradeProcessorBase will
-                // return true for isUpgrade().
-                // Instances of UpgradeProcessorBase should not be added to
-                // recycledProcessors since that pool is only for AJP or HTTP
-                // processors
-                if (!processor.isUpgrade()) {
+                if (processor.isUpgrade()) {
+                    // While UpgradeProcessor instances should not normally be
+                    // present in waitingProcessors there are various scenarios
+                    // where this can happen. E.g.:
+                    // - when AsyncIO is used
+                    // - WebSocket I/O error on non-container thread
+                    // Err on the side of caution and always try and remove any
+                    // UpgradeProcessor instances from waitingProcessors
+                    getProtocol().removeWaitingProcessor(processor);
+                } else {
+                    // After recycling, only instances of UpgradeProcessorBase
+                    // will return true for isUpgrade().
+                    // Instances of UpgradeProcessorBase should not be added to
+                    // recycledProcessors since that pool is only for AJP or
+                    // HTTP processors
                     recycledProcessors.push(processor);
                     if (getLog().isDebugEnabled()) {
                         getLog().debug("Pushed Processor [" + processor + "]");
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java
index 08b8a4e..281c914 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -1258,6 +1258,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {
         @Override
         public void close() throws IOException {
             getSocket().close();
+            getEndpoint().getHandler().release(this);
         }
 
 
diff --git a/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java b/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java
index c22f2ee..49f7597 100644
--- a/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java
+++ b/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java
@@ -57,7 +57,7 @@ public class TestWebSocketFrameClient extends WebSocketBaseTest {
         Tomcat tomcat = getTomcatInstance();
         // No file system docBase required
         Context ctx = tomcat.addContext("", null);
-        ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName());
+        ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName());
         Tomcat.addServlet(ctx, "default", new DefaultServlet());
         ctx.addServletMappingDecoded("/", "default");
 
@@ -83,7 +83,7 @@ public class TestWebSocketFrameClient extends WebSocketBaseTest {
                 TesterProgrammaticEndpoint.class,
                 clientEndpointConfig,
                 new URI("ws://localhost:" + getPort() +
-                        TesterFirehoseServer.Config.PATH));
+                        TesterFirehoseServer.PATH));
         CountDownLatch latch =
                 new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT);
         BasicText handler = new BasicText(latch);
diff --git a/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java b/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java
index 2e8b3b3..ad9366a 100644
--- a/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java
+++ b/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java
@@ -51,7 +51,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest {
         Tomcat tomcat = getTomcatInstance();
         // No file system docBase required
         Context ctx = tomcat.addContext("", null);
-        ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName());
+        ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName());
         Tomcat.addServlet(ctx, "default", new DefaultServlet());
         ctx.addServletMappingDecoded("/", "default");
 
@@ -71,7 +71,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest {
                 TesterProgrammaticEndpoint.class,
                 clientEndpointConfig,
                 new URI("wss://localhost:" + getPort() +
-                        TesterFirehoseServer.Config.PATH));
+                        TesterFirehoseServer.PATH));
         CountDownLatch latch =
                 new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT);
         BasicText handler = new BasicText(latch);
@@ -99,7 +99,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest {
         Tomcat tomcat = getTomcatInstance();
         // No file system docBase required
         Context ctx = tomcat.addContext("", null);
-        ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName());
+        ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName());
         Tomcat.addServlet(ctx, "default", new DefaultServlet());
         ctx.addServletMappingDecoded("/", "default");
 
@@ -119,7 +119,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest {
                 TesterProgrammaticEndpoint.class,
                 clientEndpointConfig,
                 new URI("wss://localhost:" + getPort() +
-                        TesterFirehoseServer.Config.PATH));
+                        TesterFirehoseServer.PATH));
 
         // Process incoming messages very slowly
         MessageHandler handler = new SleepingText(5000);
diff --git a/test/org/apache/tomcat/websocket/TesterFirehoseServer.java b/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
index 8b4b1ef..eb8ed63 100644
--- a/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
+++ b/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
@@ -41,6 +41,8 @@ public class TesterFirehoseServer {
     public static final int WAIT_TIME_MILLIS = 300000;
     public static final int SEND_TIME_OUT_MILLIS = 5000;
 
+    public static final String PATH = "/firehose";
+
     static {
         StringBuilder sb = new StringBuilder(MESSAGE_SIZE);
         for (int i = 0; i < MESSAGE_SIZE; i++) {
@@ -50,22 +52,30 @@ public class TesterFirehoseServer {
     }
 
 
-    public static class Config extends TesterEndpointConfig {
+    public static class ConfigInline extends TesterEndpointConfig {
+
+        @Override
+        protected Class<?> getEndpointClass() {
+            return EndpointInline.class;
+        }
+    }
+
 
-        public static final String PATH = "/firehose";
+    public static class ConfigThread extends TesterEndpointConfig {
 
         @Override
         protected Class<?> getEndpointClass() {
-            return Endpoint.class;
+            return EndpointThread.class;
         }
     }
 
 
-    @ServerEndpoint(Config.PATH)
-    public static class Endpoint {
+    public abstract static class Endpoint {
+
+        private static final AtomicInteger openConnectionCount = new AtomicInteger(0);
+        private static final AtomicInteger errorCount = new AtomicInteger(0);
 
-        private static AtomicInteger openConnectionCount = new AtomicInteger(0);
-        private static AtomicInteger errorCount = new AtomicInteger(0);
+        private final boolean inline;
 
         private volatile boolean started = false;
 
@@ -77,6 +87,10 @@ public class TesterFirehoseServer {
             return errorCount.intValue();
         }
 
+        public Endpoint(boolean inline) {
+            this.inline = inline;
+        }
+
         @OnOpen
         public void onOpen() {
             openConnectionCount.incrementAndGet();
@@ -98,6 +112,46 @@ public class TesterFirehoseServer {
 
             System.out.println("Received " + msg + ", now sending data");
 
+            Writer writer = new Writer(session);
+
+            if (inline) {
+                writer.doRun();
+            } else {
+                Thread t = new Thread(writer);
+                t.start();
+            }
+        }
+
+        @OnError
+        public void onError(@SuppressWarnings("unused") Throwable t) {
+            errorCount.incrementAndGet();
+        }
+
+        @OnClose
+        public void onClose() {
+            openConnectionCount.decrementAndGet();
+        }
+    }
+
+
+    private static class Writer implements Runnable {
+
+        private final Session session;
+
+        public Writer(Session session) {
+            this.session = session;
+        }
+
+        @Override
+        public void run() {
+            try {
+                doRun();
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+            }
+        }
+
+        public void doRun() throws IOException {
             session.getUserProperties().put(
                     org.apache.tomcat.websocket.Constants.BLOCKING_SEND_TIMEOUT_PROPERTY,
                     Long.valueOf(SEND_TIME_OUT_MILLIS));
@@ -116,15 +170,22 @@ public class TesterFirehoseServer {
             // Flushing should happen automatically on session close
             session.close();
         }
+    }
 
-        @OnError
-        public void onError(@SuppressWarnings("unused") Throwable t) {
-            errorCount.incrementAndGet();
+    @ServerEndpoint(PATH)
+    public static class EndpointInline extends Endpoint {
+
+        public EndpointInline() {
+            super(true);
         }
+    }
 
-        @OnClose
-        public void onClose() {
-            openConnectionCount.decrementAndGet();
+
+    @ServerEndpoint(PATH)
+    public static class EndpointThread extends Endpoint {
+
+        public EndpointThread() {
+            super(false);
         }
     }
 }
diff --git a/test/org/apache/tomcat/websocket/server/TestSlowClient.java b/test/org/apache/tomcat/websocket/server/TestSlowClient.java
new file mode 100644
index 0000000..0978dbc
--- /dev/null
+++ b/test/org/apache/tomcat/websocket/server/TestSlowClient.java
@@ -0,0 +1,92 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.websocket.server;
+
+import java.net.URI;
+
+import javax.websocket.ClientEndpointConfig;
+import javax.websocket.ContainerProvider;
+import javax.websocket.MessageHandler;
+import javax.websocket.Session;
+import javax.websocket.WebSocketContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.catalina.Context;
+import org.apache.catalina.servlets.DefaultServlet;
+import org.apache.catalina.startup.Tomcat;
+import org.apache.coyote.AbstractProtocol;
+import org.apache.tomcat.websocket.TesterFirehoseServer;
+import org.apache.tomcat.websocket.TesterMessageCountClient.TesterProgrammaticEndpoint;
+import org.apache.tomcat.websocket.WebSocketBaseTest;
+
+public class TestSlowClient extends WebSocketBaseTest {
+
+    @Test
+    public void testSendingFromAppThread() throws Exception {
+        Tomcat tomcat = getTomcatInstance();
+        Context ctx = tomcat.addContext("", null);
+        // Server side endpoint that sends a stream of messages on a new thread
+        // in response to any message received.
+        ctx.addApplicationListener(TesterFirehoseServer.ConfigThread.class.getName());
+        Tomcat.addServlet(ctx, "default", new DefaultServlet());
+        ctx.addServletMappingDecoded("/", "default");
+
+        tomcat.start();
+
+        // WebSocket client
+        WebSocketContainer wsContainer = ContainerProvider.getWebSocketContainer();
+        Session wsSession = wsContainer.connectToServer(TesterProgrammaticEndpoint.class,
+                ClientEndpointConfig.Builder.create().build(), new URI("ws://localhost:" + getPort() + TesterFirehoseServer.PATH));
+        // Configure a handler designed to create a backlog causing the server
+        // side write to time out.
+        wsSession.addMessageHandler(new VerySlowHandler());
+
+        // Trigger the sending of the messages from the server
+        wsSession.getBasicRemote().sendText("start");
+
+        // Wait for server to close connection (it shouldn't)
+        // 20s should be long enough even for the slowest CI system. May need to
+        // extend this if not.
+        int count = 0;
+        while (wsSession.isOpen() && count < 200) {
+            Thread.sleep(100);
+            count++;
+        }
+        Assert.assertTrue(wsSession.isOpen());
+        wsSession.close();
+
+        // BZ 64848 (non-container thread variant)
+        // Confirm there are no waiting processors
+        AbstractProtocol<?> protocol = (AbstractProtocol<?>) tomcat.getConnector().getProtocolHandler();
+        Assert.assertEquals(0, protocol.getWaitingProcessorCount());
+    }
+
+
+    public static class VerySlowHandler implements MessageHandler.Whole<String> {
+
+        @Override
+        public void onMessage(String message) {
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+        }
+    }
+}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index f26a444..104444c 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -74,7 +74,7 @@
       </fix>
     </changelog>
   </subsection>
-  <subseciton name="Coyote">
+  <subsection name="Coyote">
     <changelog>
       <fix>
         Refactor the HTTP/2 window update handling for padding in data frames to
@@ -96,7 +96,13 @@
         that did not have one. (markt)
       </add>
     </changelog>
-  </subseciton>
+  </subsection>
+  <subsection name="WebSocket">
+    <changelog>
+      <bug>64848</bug>: Fix a variation of this memory leak when a write I/O
+      error occurs on a non-container thread. (markt)
+    </changelog>
+  </subsection>
   <subsection name="Web applications">
     <changelog>
       <fix>


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org