You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2023/06/01 09:36:28 UTC

[tomcat] branch 9.0.x updated: BZ 66513: Enforce one concurrent request per session requirement (#623)

This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/9.0.x by this push:
     new 1311908259 BZ 66513: Enforce one concurrent request per session requirement (#623)
1311908259 is described below

commit 13119082591bd38ac46a7367b3e8372650615b37
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Thu Jun 1 11:16:16 2023 +0200

    BZ 66513: Enforce one concurrent request per session requirement (#623)
---
 .../apache/catalina/session/PersistentManager.java |   2 +
 .../apache/catalina/valves/PersistentValve.java    | 338 ++++++++++++++++-----
 .../catalina/valves/TestPersistentValve.java       |  94 ++++++
 webapps/docs/changelog.xml                         |   6 +
 webapps/docs/config/valve.xml                      |  21 ++
 5 files changed, 385 insertions(+), 76 deletions(-)

diff --git a/java/org/apache/catalina/session/PersistentManager.java b/java/org/apache/catalina/session/PersistentManager.java
index 5fd7cd8ea2..815c9e38fd 100644
--- a/java/org/apache/catalina/session/PersistentManager.java
+++ b/java/org/apache/catalina/session/PersistentManager.java
@@ -24,6 +24,8 @@ package org.apache.catalina.session;
  * <li>Fault tolerance, keep sessions backed up on disk to allow recovery in the event of unplanned restarts.</li>
  * <li>Limit the number of active sessions kept in memory by swapping less active sessions out to disk.</li>
  * </ul>
+ * If used with a load-balancer, the load-balancer must be configured to use sticky sessions for this manager to operate
+ * correctly.
  *
  * @author Kief Morris (kief@kief.com)
  */
diff --git a/java/org/apache/catalina/valves/PersistentValve.java b/java/org/apache/catalina/valves/PersistentValve.java
index 8e74e46e0f..bf2df77941 100644
--- a/java/org/apache/catalina/valves/PersistentValve.java
+++ b/java/org/apache/catalina/valves/PersistentValve.java
@@ -17,6 +17,10 @@
 package org.apache.catalina.valves;
 
 import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
@@ -36,13 +40,31 @@ import org.apache.catalina.connector.Request;
 import org.apache.catalina.connector.Response;
 
 /**
- * Valve that implements per-request session persistence. It is intended to be used with non-sticky load-balancers.
+ * Valve that implements per-request session persistence. It is intended to be used with non-sticky load-balancers and a
+ * PersistentManager. The Valve works by loading the session from the Store at the start of the request, the request
+ * then updates the session as required and the Valve saves the session to the Store at the end of the request.
  * <p>
- * <b>USAGE CONSTRAINT</b>: To work correctly it requires a PersistentManager.
+ * To avoid conflicts and/or errors when updating the session store, each session must only be accessed by no more than
+ * one concurrent request. The {@code filter} field can be used to define requests (e.g. those for static resources)
+ * that do not need access to the session and can Requests for resources that do not need to access the session and can
+ * bypass the session load/save functionality provided by this Valve.
  * <p>
- * <b>USAGE CONSTRAINT</b>: To work correctly it assumes only one request exists per session at any one time.
- *
- * @author Jean-Frederic Clere
+ * The Valve uses a per session {@code Semaphore} to ensure that each session is accessed by no more than one request at
+ * a time within a single Tomcat instance. The behaviour if multiple requests try to access the session concurrently can
+ * be controlled by the {@code semaphoreFairness}, {@code semaphoreBlockOnAcquire} and {@code
+ * semaphoreAcquireUninterruptibly} fields. If a request fails to obtain the Semaphore, the response is generated by the
+ * {@link #onSemaphoreNotAcquired(Request, Response)} method which, by default, returns a {@code 429} status code.
+ * <p>
+ * The per session Semaphores only provide limited protection against concurrent requests within a single Tomcat
+ * instance. If multiple requests access the same session concurrently across different Tomcat instances, update
+ * conflicts and/or session data loss and/or errors are very likely.
+ * <p>
+ * <b>USAGE CONSTRAINTS</b>:
+ * <ul>
+ * <li>This Valve must only be used with a PersistentManager</li>
+ * <li>The client must ensure that no more than one concurrent request accesses a session at any time across all Tomcat
+ * instances</li>
+ * </ul>
  */
 public class PersistentValve extends ValveBase {
 
@@ -55,15 +77,20 @@ public class PersistentValve extends ValveBase {
 
     protected Pattern filter = null;
 
-    // ------------------------------------------------------ Constructor
+    private ConcurrentMap<String,UsageCountingSemaphore> sessionToSemaphoreMap = new ConcurrentHashMap<>();
+
+    private boolean semaphoreFairness = true;
+
+    private boolean semaphoreBlockOnAcquire = true;
+
+    private boolean semaphoreAcquireUninterruptibly = true;
+
 
     public PersistentValve() {
         super(true);
     }
 
 
-    // --------------------------------------------------------- Public Methods
-
     @Override
     public void setContainer(Container container) {
         super.setContainer(container);
@@ -101,72 +128,99 @@ public class PersistentValve extends ValveBase {
             return;
         }
 
-        // Update the session last access time for our session (if any)
         String sessionId = request.getRequestedSessionId();
-        Manager manager = context.getManager();
-        if (sessionId != null && manager instanceof StoreManager) {
-            Store store = ((StoreManager) manager).getStore();
-            if (store != null) {
-                Session session = null;
-                try {
-                    session = store.load(sessionId);
-                } catch (Exception e) {
-                    container.getLogger().error("deserializeError");
-                }
-                if (session != null) {
-                    if (!session.isValid() || isSessionStale(session, System.currentTimeMillis())) {
-                        if (container.getLogger().isDebugEnabled()) {
-                            container.getLogger().debug("session swapped in is invalid or expired");
-                        }
-                        session.expire();
-                        store.remove(sessionId);
+        UsageCountingSemaphore semaphore = null;
+        boolean mustReleaseSemaphore = true;
+
+        try {
+            // Acquire the per session semaphore
+            if (sessionId != null) {
+                semaphore = sessionToSemaphoreMap.compute(sessionId,
+                        (k, v) -> v == null ? new UsageCountingSemaphore(semaphoreFairness) : v.incrementUsageCount());
+                if (semaphoreBlockOnAcquire) {
+                    if (semaphoreAcquireUninterruptibly) {
+                        semaphore.acquireUninterruptibly();
                     } else {
-                        session.setManager(manager);
-                        // session.setId(sessionId); Only if new ???
-                        manager.add(session);
-                        // ((StandardSession)session).activate();
-                        session.access();
-                        session.endAccess();
+                        try {
+                            semaphore.acquire();
+                        } catch (InterruptedException e) {
+                            mustReleaseSemaphore = false;
+                            onSemaphoreNotAcquired(request, response);
+                            return;
+                        }
+                    }
+                } else {
+                    if (!semaphore.tryAcquire()) {
+                        onSemaphoreNotAcquired(request, response);
+                        return;
                     }
                 }
             }
-        }
-        if (container.getLogger().isDebugEnabled()) {
-            container.getLogger().debug("sessionId: " + sessionId);
-        }
-
-        // Ask the next valve to process the request.
-        getNext().invoke(request, response);
 
-        // If still processing async, don't try to store the session
-        if (!request.isAsync()) {
-            // Read the sessionid after the response.
-            // HttpSession hsess = hreq.getSession(false);
-            Session hsess;
-            try {
-                hsess = request.getSessionInternal(false);
-            } catch (Exception ex) {
-                hsess = null;
-            }
-            String newsessionId = null;
-            if (hsess != null) {
-                newsessionId = hsess.getIdInternal();
+            // Update the session last access time for our session (if any)
+            Manager manager = context.getManager();
+            if (sessionId != null && manager instanceof StoreManager) {
+                Store store = ((StoreManager) manager).getStore();
+                if (store != null) {
+                    Session session = null;
+                    try {
+                        session = store.load(sessionId);
+                    } catch (Exception e) {
+                        container.getLogger().error("deserializeError");
+                    }
+                    if (session != null) {
+                        if (!session.isValid() || isSessionStale(session, System.currentTimeMillis())) {
+                            if (container.getLogger().isDebugEnabled()) {
+                                container.getLogger().debug("session swapped in is invalid or expired");
+                            }
+                            session.expire();
+                            store.remove(sessionId);
+                        } else {
+                            session.setManager(manager);
+                            // session.setId(sessionId); Only if new ???
+                            manager.add(session);
+                            // ((StandardSession)session).activate();
+                            session.access();
+                            session.endAccess();
+                        }
+                    }
+                }
             }
-
             if (container.getLogger().isDebugEnabled()) {
-                container.getLogger().debug("newsessionId: " + newsessionId);
+                container.getLogger().debug("sessionId: " + sessionId);
             }
-            if (newsessionId != null) {
+
+            // Ask the next valve to process the request.
+            getNext().invoke(request, response);
+
+            // If still processing async, don't try to store the session
+            if (!request.isAsync()) {
+                // Read the sessionid after the response.
+                // HttpSession hsess = hreq.getSession(false);
+                Session hsess;
                 try {
-                    bind(context);
-
-                    /* store the session and remove it from the manager */
-                    if (manager instanceof StoreManager) {
-                        Session session = manager.findSession(newsessionId);
-                        Store store = ((StoreManager) manager).getStore();
-                        boolean stored = false;
-                        if (session != null) {
-                            synchronized (session) {
+                    hsess = request.getSessionInternal(false);
+                } catch (Exception ex) {
+                    hsess = null;
+                }
+                String newsessionId = null;
+                if (hsess != null) {
+                    newsessionId = hsess.getIdInternal();
+                }
+
+                if (container.getLogger().isDebugEnabled()) {
+                    container.getLogger().debug("newsessionId: " + newsessionId);
+                }
+                if (newsessionId != null) {
+                    try {
+                        bind(context);
+
+                        /* store the session and remove it from the manager */
+                        if (manager instanceof StoreManager) {
+                            Session session = manager.findSession(newsessionId);
+                            Store store = ((StoreManager) manager).getStore();
+                            boolean stored = false;
+                            if (session != null) {
                                 if (store != null && session.isValid() &&
                                         !isSessionStale(session, System.currentTimeMillis())) {
                                     store.save(session);
@@ -175,28 +229,51 @@ public class PersistentValve extends ValveBase {
                                     stored = true;
                                 }
                             }
-                        }
-                        if (!stored) {
+                            if (!stored) {
+                                if (container.getLogger().isDebugEnabled()) {
+                                    container.getLogger()
+                                            .debug("newsessionId store: " + store + " session: " + session +
+                                                    " valid: " +
+                                                    (session == null ? "N/A" : Boolean.toString(session.isValid())) +
+                                                    " stale: " + isSessionStale(session, System.currentTimeMillis()));
+                                }
+                            }
+                        } else {
                             if (container.getLogger().isDebugEnabled()) {
-                                container.getLogger()
-                                        .debug("newsessionId store: " + store + " session: " + session + " valid: " +
-                                                (session == null ? "N/A" : Boolean.toString(session.isValid())) +
-                                                " stale: " + isSessionStale(session, System.currentTimeMillis()));
+                                container.getLogger().debug("newsessionId Manager: " + manager);
                             }
                         }
-                    } else {
-                        if (container.getLogger().isDebugEnabled()) {
-                            container.getLogger().debug("newsessionId Manager: " + manager);
-                        }
+                    } finally {
+                        unbind(context);
                     }
-                } finally {
-                    unbind(context);
                 }
             }
+        } finally {
+            if (semaphore != null) {
+                if (mustReleaseSemaphore) {
+                    semaphore.release();
+                }
+                sessionToSemaphoreMap.computeIfPresent(sessionId,
+                        (k, v) -> v.decrementAndGetUsageCount() == 0 ? null : v);
+            }
         }
     }
 
 
+    /**
+     * Handle the case where a semaphore cannot be obtained. The default behaviour is to return a 429 (too many
+     * requests) status code.
+     *
+     * @param request  The request that will not be processed
+     * @param response The response that will be used for this request
+     *
+     * @throws IOException If an I/O error occurs while working with the request or response
+     */
+    protected void onSemaphoreNotAcquired(Request request, Response response) throws IOException {
+        response.sendError(429);
+    }
+
+
     /**
      * Indicate whether the session has been idle for longer than its expiration date as of the supplied time. FIXME:
      * Probably belongs in the Session class.
@@ -258,4 +335,113 @@ public class PersistentValve extends ValveBase {
             }
         }
     }
+
+
+    /**
+     * If multiple threads attempt to acquire the same per session Semaphore, will permits be granted in the same order
+     * they were requested?
+     *
+     * @return {@code true} if fairness is enabled, otherwise {@code false}
+     */
+    public boolean isSemaphoreFairness() {
+        return semaphoreFairness;
+    }
+
+
+    /**
+     * Configure whether the per session Semaphores will handle granting of permits in the same order they were
+     * requested if multiple threads attempt to acquire the same Semaphore.
+     *
+     * @param semaphoreFairness {@code true} if permits should be granted in the same order they are requested,
+     *                              otherwise {@code false}
+     */
+    public void setSemaphoreFairness(boolean semaphoreFairness) {
+        this.semaphoreFairness = semaphoreFairness;
+    }
+
+
+    /**
+     * If a thread attempts to acquire the per session Semaphore while it is being used by another request, should the
+     * thread block to wait for the Semaphore or should the request be rejected?
+     *
+     * @return {@code true} if the thread should block, otherwise {@code false} to reject the concurrent request
+     */
+    public boolean isSemaphoreBlockOnAcquire() {
+        return semaphoreBlockOnAcquire;
+    }
+
+
+    /**
+     * Configure whether a thread should block and wait for the per session Semaphore or reject the request if the
+     * Semaphore is being used by another request.
+     *
+     * @param semaphoreBlockOnAcquire {@code true} to block, otherwise {@code false}
+     */
+    public void setSemaphoreBlockOnAcquire(boolean semaphoreBlockOnAcquire) {
+        this.semaphoreBlockOnAcquire = semaphoreBlockOnAcquire;
+    }
+
+
+    /**
+     * If a thread is blocking to acquire a per session Semaphore, can that thread be interrupted?
+     *
+     * @return {@code true} if the thread can <b>not</b> be interrupted, otherwise {@code false}.
+     */
+    public boolean isSemaphoreAcquireUninterruptibly() {
+        return semaphoreAcquireUninterruptibly;
+    }
+
+
+    /**
+     * Configure whether a thread blocking to acquire a per session Semaphore can be interrupted.
+     *
+     * @param semaphoreAcquireUninterruptibly {@code true} if the thread can <b>not</b> be interrupted, otherwise
+     *                                            {@code false}.
+     */
+    public void setSemaphoreAcquireUninterruptibly(boolean semaphoreAcquireUninterruptibly) {
+        this.semaphoreAcquireUninterruptibly = semaphoreAcquireUninterruptibly;
+    }
+
+
+    /*
+     * The PersistentValve uses a per session semaphore to ensure that only one request accesses a session at a time. To
+     * limit the size of the session ID to Semaphore map, the Semaphores are created when required and destroyed (made
+     * eligible for GC) as soon as they are not required. Tracking usage in a thread-safe way requires a usage counter
+     * that does not block. The Semaphore's internal tracking can't be used because the only way to increment usage is
+     * via the acquire methods and they block. Therefore, this class was created which uses a separate AtomicLong long
+     * to track usage.
+     */
+    private static class UsageCountingSemaphore {
+        private final AtomicLong usageCount = new AtomicLong(1);
+        private final Semaphore semaphore;
+
+        private UsageCountingSemaphore(boolean fairness) {
+            semaphore = new Semaphore(1, fairness);
+        }
+
+        private UsageCountingSemaphore incrementUsageCount() {
+            usageCount.incrementAndGet();
+            return this;
+        }
+
+        private long decrementAndGetUsageCount() {
+            return usageCount.decrementAndGet();
+        }
+
+        private void acquire() throws InterruptedException {
+            semaphore.acquire();
+        }
+
+        private void acquireUninterruptibly() {
+            semaphore.acquireUninterruptibly();
+        }
+
+        private boolean tryAcquire() {
+            return semaphore.tryAcquire();
+        }
+
+        private void release() {
+            semaphore.release();
+        }
+    }
 }
diff --git a/test/org/apache/catalina/valves/TestPersistentValve.java b/test/org/apache/catalina/valves/TestPersistentValve.java
new file mode 100644
index 0000000000..b53f85c05e
--- /dev/null
+++ b/test/org/apache/catalina/valves/TestPersistentValve.java
@@ -0,0 +1,94 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.valves;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.ServletException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.catalina.connector.Request;
+import org.apache.catalina.connector.Response;
+import org.apache.tomcat.unittest.TesterRequest;
+import org.apache.tomcat.unittest.TesterResponse;
+
+public class TestPersistentValve {
+
+    @Test
+    public void testSemaphore() throws Exception {
+        // Create the test objects
+        PersistentValve pv = new PersistentValve();
+        Request request = new TesterRequest();
+        Response response = new TesterResponse();
+        TesterValve testerValve = new TesterValve();
+
+        // Configure the test objects
+        request.setRequestedSessionId("1234");
+
+        // Plumb the test objects together
+        pv.setContainer(request.getContext());
+        pv.setNext(testerValve);
+
+        // Run the test
+        Thread[] threads = new Thread[5];
+
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new Thread(() -> {
+                try {
+                    pv.invoke(request, response);
+                } catch (IOException | ServletException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+
+        for (int i = 0; i < threads.length; i++) {
+            threads[i].start();
+        }
+
+        for (int i = 0; i < threads.length; i++) {
+            threads[i].join();
+        }
+
+        Assert.assertEquals(1, testerValve.getMaximumConcurrency());
+    }
+
+
+    private static class TesterValve extends ValveBase {
+
+        private static AtomicInteger maximumConcurrency = new AtomicInteger();
+        private static AtomicInteger concurrency = new AtomicInteger();
+
+        @Override
+        public void invoke(Request request, Response response) throws IOException, ServletException {
+            int c = concurrency.incrementAndGet();
+            maximumConcurrency.getAndUpdate((v) -> c > v ? c : v);
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+            concurrency.decrementAndGet();
+        }
+
+        public int getMaximumConcurrency() {
+            return maximumConcurrency.get();
+        }
+    }
+}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index b0ae0efbc6..b9a4603707 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -123,6 +123,12 @@
         virtual threads. This Executor requires a minimum Java version of Java
         21. (markt)
       </add>
+      <fix>
+        <bug>66513</bug>: Partial fix that adds a per session Semaphore to the
+        <code>PersistentValve</code> that ensures that, within a single Tomcat
+        instance, there is no more than one concurrent request per session.
+        (markt)
+      </fix>
       <fix>
         <bug>66609</bug>: Ensure that the default servlet correctly escapes
         file names in directory listings when using XML output. Based on pull
diff --git a/webapps/docs/config/valve.xml b/webapps/docs/config/valve.xml
index 056533b546..c796dd0131 100644
--- a/webapps/docs/config/valve.xml
+++ b/webapps/docs/config/valve.xml
@@ -2637,6 +2637,27 @@
         <code>java.util.regex</code>.</p>
       </attribute>
 
+      <attribute name="semaphoreAcquireUninterruptibly" required="false">
+        <p>Flag to determine if a thread that blocks waiting for the per session
+        Semaphore should do so uninterruptibly. Has no effect if
+        <strong>semaphoreBlockOnAcquire</strong> is <code>false</code>. If not
+        specified, the default value of <code>true</code> will be used.</p>
+      </attribute>
+
+      <attribute name="semaphoreBlockOnAcquire" required="false">
+        <p>Flag to determine if a thread that wishes to acquire the per session
+        Semaphore when it is held by another thread should block until it can
+        acquire the Semaphore or if the waiting request be rejected. If not
+        specified, the default value of <code>true</code> will be used.</p>
+      </attribute>
+
+      <attribute name="semaphoreFairness" required="false">
+        <p>Flag to determine if the per session Semaphore will grant requests
+        for the Semaphore in the same order they were received. Has no effect if
+        <strong>semaphoreBlockOnAcquire</strong> is <code>false</code>. If not
+        specified, the default value of <code>true</code> will be used.</p>
+      </attribute>
+
     </attributes>
 
   </subsection>


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org