You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2023/06/01 10:12:04 UTC
[tomcat] branch 8.5.x updated: BZ 66513: Enforce one concurrent request per session requirement (#623)
This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch 8.5.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/8.5.x by this push:
new 1a1fccf2ad BZ 66513: Enforce one concurrent request per session requirement (#623)
1a1fccf2ad is described below
commit 1a1fccf2ad3f5a59297d787f4217e2c1c6bc7b96
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Thu Jun 1 11:16:16 2023 +0200
BZ 66513: Enforce one concurrent request per session requirement (#623)
---
.../apache/catalina/session/PersistentManager.java | 2 +
.../apache/catalina/valves/PersistentValve.java | 347 ++++++++++++++++-----
.../catalina/valves/TestPersistentValve.java | 99 ++++++
webapps/docs/changelog.xml | 6 +
webapps/docs/config/valve.xml | 21 ++
5 files changed, 400 insertions(+), 75 deletions(-)
diff --git a/java/org/apache/catalina/session/PersistentManager.java b/java/org/apache/catalina/session/PersistentManager.java
index 5fd7cd8ea2..815c9e38fd 100644
--- a/java/org/apache/catalina/session/PersistentManager.java
+++ b/java/org/apache/catalina/session/PersistentManager.java
@@ -24,6 +24,8 @@ package org.apache.catalina.session;
* <li>Fault tolerance, keep sessions backed up on disk to allow recovery in the event of unplanned restarts.</li>
* <li>Limit the number of active sessions kept in memory by swapping less active sessions out to disk.</li>
* </ul>
+ * If used with a load-balancer, the load-balancer must be configured to use sticky sessions for this manager to operate
+ * correctly.
*
* @author Kief Morris (kief@kief.com)
*/
diff --git a/java/org/apache/catalina/valves/PersistentValve.java b/java/org/apache/catalina/valves/PersistentValve.java
index e33d2b0de7..6fe241822c 100644
--- a/java/org/apache/catalina/valves/PersistentValve.java
+++ b/java/org/apache/catalina/valves/PersistentValve.java
@@ -17,6 +17,10 @@
package org.apache.catalina.valves;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -36,13 +40,31 @@ import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
/**
- * Valve that implements per-request session persistence. It is intended to be used with non-sticky load-balancers.
+ * Valve that implements per-request session persistence. It is intended to be used with non-sticky load-balancers and a
+ * PersistentManager. The Valve works by loading the session from the Store at the start of the request, the request
+ * then updates the session as required and the Valve saves the session to the Store at the end of the request.
* <p>
- * <b>USAGE CONSTRAINT</b>: To work correctly it requires a PersistentManager.
+ * To avoid conflicts and/or errors when updating the session store, each session must only be accessed by no more than
+ * one concurrent request. The {@code filter} field can be used to define requests (e.g. those for static resources)
+ * that do not need access to the session and can Requests for resources that do not need to access the session and can
+ * bypass the session load/save functionality provided by this Valve.
* <p>
- * <b>USAGE CONSTRAINT</b>: To work correctly it assumes only one request exists per session at any one time.
- *
- * @author Jean-Frederic Clere
+ * The Valve uses a per session {@code Semaphore} to ensure that each session is accessed by no more than one request at
+ * a time within a single Tomcat instance. The behaviour if multiple requests try to access the session concurrently can
+ * be controlled by the {@code semaphoreFairness}, {@code semaphoreBlockOnAcquire} and {@code
+ * semaphoreAcquireUninterruptibly} fields. If a request fails to obtain the Semaphore, the response is generated by the
+ * {@link #onSemaphoreNotAcquired(Request, Response)} method which, by default, returns a {@code 429} status code.
+ * <p>
+ * The per session Semaphores only provide limited protection against concurrent requests within a single Tomcat
+ * instance. If multiple requests access the same session concurrently across different Tomcat instances, update
+ * conflicts and/or session data loss and/or errors are very likely.
+ * <p>
+ * <b>USAGE CONSTRAINTS</b>:
+ * <ul>
+ * <li>This Valve must only be used with a PersistentManager</li>
+ * <li>The client must ensure that no more than one concurrent request accesses a session at any time across all Tomcat
+ * instances</li>
+ * </ul>
*/
public class PersistentValve extends ValveBase {
@@ -55,15 +77,20 @@ public class PersistentValve extends ValveBase {
protected Pattern filter = null;
- // ------------------------------------------------------ Constructor
+ private final Map<String,UsageCountingSemaphore> sessionToSemaphoreMap = new HashMap<>();
+
+ private boolean semaphoreFairness = true;
+
+ private boolean semaphoreBlockOnAcquire = true;
+
+ private boolean semaphoreAcquireUninterruptibly = true;
+
public PersistentValve() {
super(true);
}
- // --------------------------------------------------------- Public Methods
-
@Override
public void setContainer(Container container) {
super.setContainer(container);
@@ -101,72 +128,106 @@ public class PersistentValve extends ValveBase {
return;
}
- // Update the session last access time for our session (if any)
String sessionId = request.getRequestedSessionId();
- Manager manager = context.getManager();
- if (sessionId != null && manager instanceof StoreManager) {
- Store store = ((StoreManager) manager).getStore();
- if (store != null) {
- Session session = null;
- try {
- session = store.load(sessionId);
- } catch (Exception e) {
- container.getLogger().error("deserializeError");
+ UsageCountingSemaphore semaphore = null;
+ boolean mustReleaseSemaphore = true;
+
+ try {
+ // Acquire the per session semaphore
+ if (sessionId != null) {
+ synchronized (sessionToSemaphoreMap) {
+ semaphore = sessionToSemaphoreMap.get(sessionId);
+ if (semaphore == null) {
+ semaphore = new UsageCountingSemaphore(semaphoreFairness);
+ sessionToSemaphoreMap.put(sessionId, semaphore);
+ } else {
+ semaphore.incrementUsageCount();
+ }
}
- if (session != null) {
- if (!session.isValid() || isSessionStale(session, System.currentTimeMillis())) {
- if (container.getLogger().isDebugEnabled()) {
- container.getLogger().debug("session swapped in is invalid or expired");
- }
- session.expire();
- store.remove(sessionId);
+ if (semaphoreBlockOnAcquire) {
+ if (semaphoreAcquireUninterruptibly) {
+ semaphore.acquireUninterruptibly();
} else {
- session.setManager(manager);
- // session.setId(sessionId); Only if new ???
- manager.add(session);
- // ((StandardSession)session).activate();
- session.access();
- session.endAccess();
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ mustReleaseSemaphore = false;
+ onSemaphoreNotAcquired(request, response);
+ return;
+ }
+ }
+ } else {
+ if (!semaphore.tryAcquire()) {
+ onSemaphoreNotAcquired(request, response);
+ return;
}
}
}
- }
- if (container.getLogger().isDebugEnabled()) {
- container.getLogger().debug("sessionId: " + sessionId);
- }
-
- // Ask the next valve to process the request.
- getNext().invoke(request, response);
- // If still processing async, don't try to store the session
- if (!request.isAsync()) {
- // Read the sessionid after the response.
- // HttpSession hsess = hreq.getSession(false);
- Session hsess;
- try {
- hsess = request.getSessionInternal(false);
- } catch (Exception ex) {
- hsess = null;
- }
- String newsessionId = null;
- if (hsess != null) {
- newsessionId = hsess.getIdInternal();
+ // Update the session last access time for our session (if any)
+ Manager manager = context.getManager();
+ if (sessionId != null && manager instanceof StoreManager) {
+ Store store = ((StoreManager) manager).getStore();
+ if (store != null) {
+ Session session = null;
+ try {
+ session = store.load(sessionId);
+ } catch (Exception e) {
+ container.getLogger().error("deserializeError");
+ }
+ if (session != null) {
+ if (!session.isValid() || isSessionStale(session, System.currentTimeMillis())) {
+ if (container.getLogger().isDebugEnabled()) {
+ container.getLogger().debug("session swapped in is invalid or expired");
+ }
+ session.expire();
+ store.remove(sessionId);
+ } else {
+ session.setManager(manager);
+ // session.setId(sessionId); Only if new ???
+ manager.add(session);
+ // ((StandardSession)session).activate();
+ session.access();
+ session.endAccess();
+ }
+ }
+ }
}
-
if (container.getLogger().isDebugEnabled()) {
- container.getLogger().debug("newsessionId: " + newsessionId);
+ container.getLogger().debug("sessionId: " + sessionId);
}
- if (newsessionId != null) {
+
+ // Ask the next valve to process the request.
+ getNext().invoke(request, response);
+
+ // If still processing async, don't try to store the session
+ if (!request.isAsync()) {
+ // Read the sessionid after the response.
+ // HttpSession hsess = hreq.getSession(false);
+ Session hsess;
try {
- bind(context);
-
- /* store the session and remove it from the manager */
- if (manager instanceof StoreManager) {
- Session session = manager.findSession(newsessionId);
- Store store = ((StoreManager) manager).getStore();
- boolean stored = false;
- if (session != null) {
- synchronized (session) {
+ hsess = request.getSessionInternal(false);
+ } catch (Exception ex) {
+ hsess = null;
+ }
+ String newsessionId = null;
+ if (hsess != null) {
+ newsessionId = hsess.getIdInternal();
+ }
+
+ if (container.getLogger().isDebugEnabled()) {
+ container.getLogger().debug("newsessionId: " + newsessionId);
+ }
+ if (newsessionId != null) {
+ try {
+ bind(context);
+
+ /* store the session and remove it from the manager */
+ if (manager instanceof StoreManager) {
+ Session session = manager.findSession(newsessionId);
+ Store store = ((StoreManager) manager).getStore();
+ boolean stored = false;
+ if (session != null) {
if (store != null && session.isValid() &&
!isSessionStale(session, System.currentTimeMillis())) {
store.save(session);
@@ -176,28 +237,55 @@ public class PersistentValve extends ValveBase {
}
}
- }
- if (!stored) {
+ if (!stored) {
+ if (container.getLogger().isDebugEnabled()) {
+ container.getLogger()
+ .debug("newsessionId store: " + store + " session: " + session +
+ " valid: " +
+ (session == null ? "N/A" : Boolean.toString(session.isValid())) +
+ " stale: " + isSessionStale(session, System.currentTimeMillis()));
+ }
+ }
+ } else {
if (container.getLogger().isDebugEnabled()) {
- container.getLogger()
- .debug("newsessionId store: " + store + " session: " + session + " valid: " +
- (session == null ? "N/A" : Boolean.toString(session.isValid())) +
- " stale: " + isSessionStale(session, System.currentTimeMillis()));
+ container.getLogger().debug("newsessionId Manager: " + manager);
}
}
- } else {
- if (container.getLogger().isDebugEnabled()) {
- container.getLogger().debug("newsessionId Manager: " + manager);
- }
+ } finally {
+ unbind(context);
+ }
+ }
+ }
+ } finally {
+ if (semaphore != null) {
+ if (mustReleaseSemaphore) {
+ semaphore.release();
+ }
+ synchronized (sessionToSemaphoreMap) {
+ long usage = semaphore.decrementAndGetUsageCount();
+ if (usage == 0) {
+ sessionToSemaphoreMap.remove(sessionId);
}
- } finally {
- unbind(context);
}
}
}
}
+ /**
+ * Handle the case where a semaphore cannot be obtained. The default behaviour is to return a 429 (too many
+ * requests) status code.
+ *
+ * @param request The request that will not be processed
+ * @param response The response that will be used for this request
+ *
+ * @throws IOException If an I/O error occurs while working with the request or response
+ */
+ protected void onSemaphoreNotAcquired(Request request, Response response) throws IOException {
+ response.sendError(429);
+ }
+
+
/**
* Indicate whether the session has been idle for longer than its expiration date as of the supplied time. FIXME:
* Probably belongs in the Session class.
@@ -259,4 +347,113 @@ public class PersistentValve extends ValveBase {
}
}
}
+
+
+ /**
+ * If multiple threads attempt to acquire the same per session Semaphore, will permits be granted in the same order
+ * they were requested?
+ *
+ * @return {@code true} if fairness is enabled, otherwise {@code false}
+ */
+ public boolean isSemaphoreFairness() {
+ return semaphoreFairness;
+ }
+
+
+ /**
+ * Configure whether the per session Semaphores will handle granting of permits in the same order they were
+ * requested if multiple threads attempt to acquire the same Semaphore.
+ *
+ * @param semaphoreFairness {@code true} if permits should be granted in the same order they are requested,
+ * otherwise {@code false}
+ */
+ public void setSemaphoreFairness(boolean semaphoreFairness) {
+ this.semaphoreFairness = semaphoreFairness;
+ }
+
+
+ /**
+ * If a thread attempts to acquire the per session Semaphore while it is being used by another request, should the
+ * thread block to wait for the Semaphore or should the request be rejected?
+ *
+ * @return {@code true} if the thread should block, otherwise {@code false} to reject the concurrent request
+ */
+ public boolean isSemaphoreBlockOnAcquire() {
+ return semaphoreBlockOnAcquire;
+ }
+
+
+ /**
+ * Configure whether a thread should block and wait for the per session Semaphore or reject the request if the
+ * Semaphore is being used by another request.
+ *
+ * @param semaphoreBlockOnAcquire {@code true} to block, otherwise {@code false}
+ */
+ public void setSemaphoreBlockOnAcquire(boolean semaphoreBlockOnAcquire) {
+ this.semaphoreBlockOnAcquire = semaphoreBlockOnAcquire;
+ }
+
+
+ /**
+ * If a thread is blocking to acquire a per session Semaphore, can that thread be interrupted?
+ *
+ * @return {@code true} if the thread can <b>not</b> be interrupted, otherwise {@code false}.
+ */
+ public boolean isSemaphoreAcquireUninterruptibly() {
+ return semaphoreAcquireUninterruptibly;
+ }
+
+
+ /**
+ * Configure whether a thread blocking to acquire a per session Semaphore can be interrupted.
+ *
+ * @param semaphoreAcquireUninterruptibly {@code true} if the thread can <b>not</b> be interrupted, otherwise
+ * {@code false}.
+ */
+ public void setSemaphoreAcquireUninterruptibly(boolean semaphoreAcquireUninterruptibly) {
+ this.semaphoreAcquireUninterruptibly = semaphoreAcquireUninterruptibly;
+ }
+
+
+ /*
+ * The PersistentValve uses a per session semaphore to ensure that only one request accesses a session at a time. To
+ * limit the size of the session ID to Semaphore map, the Semaphores are created when required and destroyed (made
+ * eligible for GC) as soon as they are not required. Tracking usage in a thread-safe way requires a usage counter
+ * that does not block. The Semaphore's internal tracking can't be used because the only way to increment usage is
+ * via the acquire methods and they block. Therefore, this class was created which uses a separate AtomicLong long
+ * to track usage.
+ */
+ private static class UsageCountingSemaphore {
+ private final AtomicLong usageCount = new AtomicLong(1);
+ private final Semaphore semaphore;
+
+ private UsageCountingSemaphore(boolean fairness) {
+ semaphore = new Semaphore(1, fairness);
+ }
+
+ private UsageCountingSemaphore incrementUsageCount() {
+ usageCount.incrementAndGet();
+ return this;
+ }
+
+ private long decrementAndGetUsageCount() {
+ return usageCount.decrementAndGet();
+ }
+
+ private void acquire() throws InterruptedException {
+ semaphore.acquire();
+ }
+
+ private void acquireUninterruptibly() {
+ semaphore.acquireUninterruptibly();
+ }
+
+ private boolean tryAcquire() {
+ return semaphore.tryAcquire();
+ }
+
+ private void release() {
+ semaphore.release();
+ }
+ }
}
diff --git a/test/org/apache/catalina/valves/TestPersistentValve.java b/test/org/apache/catalina/valves/TestPersistentValve.java
new file mode 100644
index 0000000000..847f31454d
--- /dev/null
+++ b/test/org/apache/catalina/valves/TestPersistentValve.java
@@ -0,0 +1,99 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.valves;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.ServletException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.catalina.connector.Request;
+import org.apache.catalina.connector.Response;
+import org.apache.tomcat.unittest.TesterRequest;
+import org.apache.tomcat.unittest.TesterResponse;
+
+public class TestPersistentValve {
+
+ @Test
+ public void testSemaphore() throws Exception {
+ // Create the test objects
+ final PersistentValve pv = new PersistentValve();
+ final Request request = new TesterRequest();
+ final Response response = new TesterResponse();
+ TesterValve testerValve = new TesterValve();
+
+ // Configure the test objects
+ request.setRequestedSessionId("1234");
+
+ // Plumb the test objects together
+ pv.setContainer(request.getContext());
+ pv.setNext(testerValve);
+
+ // Run the test
+ Thread[] threads = new Thread[5];
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ pv.invoke(request, response);
+ } catch (IOException | ServletException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+
+ Assert.assertFalse(testerValve.getConcurrencyLimitExceeded());
+ }
+
+
+ private static class TesterValve extends ValveBase {
+
+ private static volatile boolean concurrencyLimitExceeded;
+ private static AtomicInteger concurrency = new AtomicInteger();
+
+ @Override
+ public void invoke(Request request, Response response) throws IOException, ServletException {
+ int c = concurrency.incrementAndGet();
+ if (c > 1) {
+ concurrencyLimitExceeded = true;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ concurrency.decrementAndGet();
+ }
+
+ public boolean getConcurrencyLimitExceeded() {
+ return concurrencyLimitExceeded;
+ }
+ }
+}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 0e0c4bfbbe..68b45c6e7b 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -114,6 +114,12 @@
virtual threads. This Executor requires a minimum Java version of Java
21. (markt)
</add>
+ <fix>
+ <bug>66513</bug>: Partial fix that adds a per session Semaphore to the
+ <code>PersistentValve</code> that ensures that, within a single Tomcat
+ instance, there is no more than one concurrent request per session.
+ (markt)
+ </fix>
<fix>
<bug>66609</bug>: Ensure that the default servlet correctly escapes
file names in directory listings when using XML output. Based on pull
diff --git a/webapps/docs/config/valve.xml b/webapps/docs/config/valve.xml
index 2da2666002..4be06d0139 100644
--- a/webapps/docs/config/valve.xml
+++ b/webapps/docs/config/valve.xml
@@ -2599,6 +2599,27 @@
<code>java.util.regex</code>.</p>
</attribute>
+ <attribute name="semaphoreAcquireUninterruptibly" required="false">
+ <p>Flag to determine if a thread that blocks waiting for the per session
+ Semaphore should do so uninterruptibly. Has no effect if
+ <strong>semaphoreBlockOnAcquire</strong> is <code>false</code>. If not
+ specified, the default value of <code>true</code> will be used.</p>
+ </attribute>
+
+ <attribute name="semaphoreBlockOnAcquire" required="false">
+ <p>Flag to determine if a thread that wishes to acquire the per session
+ Semaphore when it is held by another thread should block until it can
+ acquire the Semaphore or if the waiting request be rejected. If not
+ specified, the default value of <code>true</code> will be used.</p>
+ </attribute>
+
+ <attribute name="semaphoreFairness" required="false">
+ <p>Flag to determine if the per session Semaphore will grant requests
+ for the Semaphore in the same order they were received. Has no effect if
+ <strong>semaphoreBlockOnAcquire</strong> is <code>false</code>. If not
+ specified, the default value of <code>true</code> will be used.</p>
+ </attribute>
+
</attributes>
</subsection>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org