You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2020/11/02 14:53:12 UTC
[tomcat] branch master updated: Additional fix for BZ 64848. Ensure
Processor instances are cleaned up
This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push:
new 85d5a9f Additional fix for BZ 64848. Ensure Processor instances are cleaned up
85d5a9f is described below
commit 85d5a9fab0d25554a698c6b2fb5c69516b84e12e
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Mon Nov 2 14:52:24 2020 +0000
Additional fix for BZ 64848. Ensure Processor instances are cleaned up
---
java/org/apache/coyote/AbstractProtocol.java | 27 ++++---
.../tomcat/websocket/TestWebSocketFrameClient.java | 4 +-
.../websocket/TestWebSocketFrameClientSSL.java | 8 +-
.../tomcat/websocket/TesterFirehoseServer.java | 87 +++++++++++++++++---
.../tomcat/websocket/server/TestSlowClient.java | 92 ++++++++++++++++++++++
webapps/docs/changelog.xml | 6 ++
6 files changed, 195 insertions(+), 29 deletions(-)
diff --git a/java/org/apache/coyote/AbstractProtocol.java b/java/org/apache/coyote/AbstractProtocol.java
index e3ce9d7..e6fbaed 100644
--- a/java/org/apache/coyote/AbstractProtocol.java
+++ b/java/org/apache/coyote/AbstractProtocol.java
@@ -40,7 +40,6 @@ import jakarta.servlet.http.HttpUpgradeHandler;
import jakarta.servlet.http.WebConnection;
import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
-import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal;
import org.apache.juli.logging.Log;
import org.apache.tomcat.InstanceManager;
import org.apache.tomcat.util.ExceptionUtils;
@@ -385,6 +384,15 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
}
+ /*
+ * Primarily for debugging and testing. Could be exposed via JMX if
+ * considered useful.
+ */
+ public int getWaitingProcessorCount() {
+ return waitingProcessors.size();
+ }
+
+
// ----------------------------------------------- Accessors for sub-classes
protected AbstractEndpoint<S,?> getEndpoint() {
@@ -1038,15 +1046,14 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
if (processor != null) {
processor.recycle();
if (processor.isUpgrade()) {
- // UpgradeProcessorInternal instances can utilise AsyncIO.
- // If they do, the processor will not pass through the
- // process() method and be removed from waitingProcessors
- // so do that here.
- if (processor instanceof UpgradeProcessorInternal) {
- if (((UpgradeProcessorInternal) processor).hasAsyncIO()) {
- getProtocol().removeWaitingProcessor(processor);
- }
- }
+ // While UpgradeProcessor instances should not normally be
+ // present in waitingProcessors there are various scenarios
+ // where this can happen. E.g.:
+ // - when AsyncIO is used
+ // - WebSocket I/O error on non-container thread
+ // Err on the side of caution and always try and remove any
+ // UpgradeProcessor instances from waitingProcessors
+ getProtocol().removeWaitingProcessor(processor);
} else {
// After recycling, only instances of UpgradeProcessorBase
// will return true for isUpgrade().
diff --git a/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java b/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java
index aa7036a..f9ad656 100644
--- a/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java
+++ b/test/org/apache/tomcat/websocket/TestWebSocketFrameClient.java
@@ -57,7 +57,7 @@ public class TestWebSocketFrameClient extends WebSocketBaseTest {
Tomcat tomcat = getTomcatInstance();
// No file system docBase required
Context ctx = tomcat.addContext("", null);
- ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName());
+ ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName());
Tomcat.addServlet(ctx, "default", new DefaultServlet());
ctx.addServletMappingDecoded("/", "default");
@@ -80,7 +80,7 @@ public class TestWebSocketFrameClient extends WebSocketBaseTest {
TesterProgrammaticEndpoint.class,
clientEndpointConfig,
new URI("ws://localhost:" + getPort() +
- TesterFirehoseServer.Config.PATH));
+ TesterFirehoseServer.PATH));
CountDownLatch latch =
new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT);
BasicText handler = new BasicText(latch);
diff --git a/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java b/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java
index d27e4dd..be84b5e 100644
--- a/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java
+++ b/test/org/apache/tomcat/websocket/TestWebSocketFrameClientSSL.java
@@ -45,7 +45,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest {
Tomcat tomcat = getTomcatInstance();
// No file system docBase required
Context ctx = tomcat.addContext("", null);
- ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName());
+ ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName());
Tomcat.addServlet(ctx, "default", new DefaultServlet());
ctx.addServletMappingDecoded("/", "default");
@@ -64,7 +64,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest {
TesterProgrammaticEndpoint.class,
clientEndpointConfig,
new URI("wss://localhost:" + getPort() +
- TesterFirehoseServer.Config.PATH));
+ TesterFirehoseServer.PATH));
CountDownLatch latch =
new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT);
BasicText handler = new BasicText(latch);
@@ -92,7 +92,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest {
Tomcat tomcat = getTomcatInstance();
// No file system docBase required
Context ctx = tomcat.addContext("", null);
- ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName());
+ ctx.addApplicationListener(TesterFirehoseServer.ConfigInline.class.getName());
Tomcat.addServlet(ctx, "default", new DefaultServlet());
ctx.addServletMappingDecoded("/", "default");
@@ -111,7 +111,7 @@ public class TestWebSocketFrameClientSSL extends WebSocketBaseTest {
TesterProgrammaticEndpoint.class,
clientEndpointConfig,
new URI("wss://localhost:" + getPort() +
- TesterFirehoseServer.Config.PATH));
+ TesterFirehoseServer.PATH));
// Process incoming messages very slowly
MessageHandler handler = new SleepingText(5000);
diff --git a/test/org/apache/tomcat/websocket/TesterFirehoseServer.java b/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
index e16b11b..f070323 100644
--- a/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
+++ b/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
@@ -41,6 +41,8 @@ public class TesterFirehoseServer {
public static final int WAIT_TIME_MILLIS = 300000;
public static final int SEND_TIME_OUT_MILLIS = 5000;
+ public static final String PATH = "/firehose";
+
static {
StringBuilder sb = new StringBuilder(MESSAGE_SIZE);
for (int i = 0; i < MESSAGE_SIZE; i++) {
@@ -50,22 +52,30 @@ public class TesterFirehoseServer {
}
- public static class Config extends TesterEndpointConfig {
+ public static class ConfigInline extends TesterEndpointConfig {
+
+ @Override
+ protected Class<?> getEndpointClass() {
+ return EndpointInline.class;
+ }
+ }
+
- public static final String PATH = "/firehose";
+ public static class ConfigThread extends TesterEndpointConfig {
@Override
protected Class<?> getEndpointClass() {
- return Endpoint.class;
+ return EndpointThread.class;
}
}
- @ServerEndpoint(Config.PATH)
- public static class Endpoint {
+ public abstract static class Endpoint {
+
+ private static final AtomicInteger openConnectionCount = new AtomicInteger(0);
+ private static final AtomicInteger errorCount = new AtomicInteger(0);
- private static AtomicInteger openConnectionCount = new AtomicInteger(0);
- private static AtomicInteger errorCount = new AtomicInteger(0);
+ private final boolean inline;
private volatile boolean started = false;
@@ -77,6 +87,10 @@ public class TesterFirehoseServer {
return errorCount.intValue();
}
+ public Endpoint(boolean inline) {
+ this.inline = inline;
+ }
+
@OnOpen
public void onOpen() {
openConnectionCount.incrementAndGet();
@@ -98,6 +112,46 @@ public class TesterFirehoseServer {
System.out.println("Received " + msg + ", now sending data");
+ Writer writer = new Writer(session);
+
+ if (inline) {
+ writer.doRun();
+ } else {
+ Thread t = new Thread(writer);
+ t.start();
+ }
+ }
+
+ @OnError
+ public void onError(@SuppressWarnings("unused") Throwable t) {
+ errorCount.incrementAndGet();
+ }
+
+ @OnClose
+ public void onClose() {
+ openConnectionCount.decrementAndGet();
+ }
+ }
+
+
+ private static class Writer implements Runnable {
+
+ private final Session session;
+
+ public Writer(Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public void run() {
+ try {
+ doRun();
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+
+ public void doRun() throws IOException {
session.getUserProperties().put(
org.apache.tomcat.websocket.Constants.BLOCKING_SEND_TIMEOUT_PROPERTY,
Long.valueOf(SEND_TIME_OUT_MILLIS));
@@ -116,15 +170,22 @@ public class TesterFirehoseServer {
// Flushing should happen automatically on session close
session.close();
}
+ }
- @OnError
- public void onError(@SuppressWarnings("unused") Throwable t) {
- errorCount.incrementAndGet();
+ @ServerEndpoint(PATH)
+ public static class EndpointInline extends Endpoint {
+
+ public EndpointInline() {
+ super(true);
}
+ }
- @OnClose
- public void onClose() {
- openConnectionCount.decrementAndGet();
+
+ @ServerEndpoint(PATH)
+ public static class EndpointThread extends Endpoint {
+
+ public EndpointThread() {
+ super(false);
}
}
}
diff --git a/test/org/apache/tomcat/websocket/server/TestSlowClient.java b/test/org/apache/tomcat/websocket/server/TestSlowClient.java
new file mode 100644
index 0000000..8c73608
--- /dev/null
+++ b/test/org/apache/tomcat/websocket/server/TestSlowClient.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket.server;
+
+import java.net.URI;
+
+import jakarta.websocket.ClientEndpointConfig;
+import jakarta.websocket.ContainerProvider;
+import jakarta.websocket.MessageHandler;
+import jakarta.websocket.Session;
+import jakarta.websocket.WebSocketContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.catalina.Context;
+import org.apache.catalina.servlets.DefaultServlet;
+import org.apache.catalina.startup.Tomcat;
+import org.apache.coyote.AbstractProtocol;
+import org.apache.tomcat.websocket.TesterFirehoseServer;
+import org.apache.tomcat.websocket.TesterMessageCountClient.TesterProgrammaticEndpoint;
+import org.apache.tomcat.websocket.WebSocketBaseTest;
+
+public class TestSlowClient extends WebSocketBaseTest {
+
+ @Test
+ public void testSendingFromAppThread() throws Exception {
+ Tomcat tomcat = getTomcatInstance();
+ Context ctx = tomcat.addContext("", null);
+ // Server side endpoint that sends a stream of messages on a new thread
+ // in response to any message received.
+ ctx.addApplicationListener(TesterFirehoseServer.ConfigThread.class.getName());
+ Tomcat.addServlet(ctx, "default", new DefaultServlet());
+ ctx.addServletMappingDecoded("/", "default");
+
+ tomcat.start();
+
+ // WebSocket client
+ WebSocketContainer wsContainer = ContainerProvider.getWebSocketContainer();
+ Session wsSession = wsContainer.connectToServer(TesterProgrammaticEndpoint.class,
+ ClientEndpointConfig.Builder.create().build(), new URI("ws://localhost:" + getPort() + TesterFirehoseServer.PATH));
+ // Configure a handler designed to create a backlog causing the server
+ // side write to time out.
+ wsSession.addMessageHandler(new VerySlowHandler());
+
+ // Trigger the sending of the messages from the server
+ wsSession.getBasicRemote().sendText("start");
+
+ // Wait for server to close connection (it shouldn't)
+ // 20s should be long enough even for the slowest CI system. May need to
+ // extend this if not.
+ int count = 0;
+ while (wsSession.isOpen() && count < 200) {
+ Thread.sleep(100);
+ count++;
+ }
+ Assert.assertTrue(wsSession.isOpen());
+ wsSession.close();
+
+ // BZ 64848 (non-container thread variant)
+ // Confirm there are no waiting processors
+ AbstractProtocol<?> protocol = (AbstractProtocol<?>) tomcat.getConnector().getProtocolHandler();
+ Assert.assertEquals(0, protocol.getWaitingProcessorCount());
+ }
+
+
+ public static class VerySlowHandler implements MessageHandler.Whole<String> {
+
+ @Override
+ public void onMessage(String message) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 2ff68a9..7fe24a3 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -145,6 +145,12 @@
</update>
</changelog>
</subsection>
+ <subsection name="WebSocket">
+ <changelog>
+ <bug>64848</bug>: Fix a variation of this memory leak when a write I/O
+ error occurs on a non-container thread. (markt)
+ </changelog>
+ </subsection>
<subsection name="Web applications">
<changelog>
<fix>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org