You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ly...@apache.org on 2023/06/20 22:21:35 UTC

[tinkerpop] branch 3.5-dev updated: TINKERPOP-2958 Added cancel for query timeout task when query completes. (#2090)

This is an automated email from the ASF dual-hosted git repository.

lyndonb pushed a commit to branch 3.5-dev
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/3.5-dev by this push:
     new b449a9c3e1 TINKERPOP-2958 Added cancel for query timeout task when query completes. (#2090)
b449a9c3e1 is described below

commit b449a9c3e17e99b340205dfaf58ee69d3b312a87
Author: Lyndon Bauto <ly...@apache.org>
AuthorDate: Tue Jun 20 15:21:30 2023 -0700

    TINKERPOP-2958 Added cancel for query timeout task when query completes. (#2090)
---
 CHANGELOG.asciidoc                                 |  1 +
 .../apache/tinkerpop/gremlin/server/Context.java   | 23 ++++++++++++++++++++++
 .../server/op/session/SessionOpProcessor.java      | 13 ++++++++++--
 .../server/op/traversal/TraversalOpProcessor.java  | 13 ++++++++++--
 4 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 175d54344e..b91a7b4ca1 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -30,6 +30,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Upgraded `gremlin-go` to Go 1.20.
 * Improved the python Translator class
 * Added `gremlin-java8.bat` file as a workaround to allow loading the console using Java 8 on Windows.
+* Fixed a bug in `gremlin-server` where timeout tasks were not cancelled and could cause very large memory usage when timeout is large.
 
 [[release-3-5-6]]
 === TinkerPop 3.5.6 (Release Date: May 1, 2023)
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index ad15772509..38f2f25966 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -55,6 +56,9 @@ public class Context {
     private final RequestContentType requestContentType;
     private final Object gremlinArgument;
     private final AtomicBoolean startedResponse = new AtomicBoolean(false);
+    private ScheduledFuture<?> timeoutExecutor = null;
+    private boolean timeoutExecutorGrabbed = false;
+    private final Object timeoutExecutorLock = new Object();
 
     /**
      * The type of the request as determined by the contents of {@link Tokens#ARGS_GREMLIN}.
@@ -92,6 +96,25 @@ public class Context {
         this.requestTimeout = determineTimeout();
     }
 
+    public void setTimeoutExecutor(final ScheduledFuture<?> timeoutExecutor) {
+        synchronized (timeoutExecutorLock) {
+            this.timeoutExecutor = timeoutExecutor;
+
+            // Timeout was grabbed before we got here, this means the query executed before the timeout was created.
+            if (timeoutExecutorGrabbed) {
+                // Cancel the timeout.
+                this.timeoutExecutor.cancel(true);
+            }
+        }
+    }
+
+    public ScheduledFuture<?> getTimeoutExecutor() {
+        synchronized (timeoutExecutorLock) {
+            timeoutExecutorGrabbed = true;
+            return this.timeoutExecutor;
+        }
+    }
+
     /**
      * The timeout for the request. If the request is a script it examines the script for a timeout setting using
      * {@code with()}. If that is not found then it examines the request itself to see if the timeout is provided by
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
index 4077ffdb11..3d17786be9 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
@@ -65,6 +65,7 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -466,6 +467,14 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
             } finally {
                 // todo: timer matter???
                 //timerContext.stop();
+
+                // There is a race condition that this query may have finished before the timeoutFuture was created,
+                // though this is very unlikely. This is handled in the settor, if this has already been grabbed.
+                // If we passed this point and the setter hasn't been called, it will cancel the timeoutFuture inside
+                // the setter to compensate.
+                final ScheduledFuture<?> timeoutFuture = context.getTimeoutExecutor();
+                if (null != timeoutFuture)
+                    timeoutFuture.cancel(true);
             }
 
             return null;
@@ -479,12 +488,12 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
         final Future<?> executionFuture = session.getGremlinExecutor().getExecutorService().submit(evalFuture);
         if (seto > 0) {
             // Schedule a timeout in the thread pool for future execution
-            context.getScheduledExecutorService().schedule(() -> {
+            context.setTimeoutExecutor(context.getScheduledExecutorService().schedule(() -> {
                 executionFuture.cancel(true);
                 if (!context.getStartedResponse()) {
                     context.sendTimeoutResponse();
                 }
-            }, seto, TimeUnit.MILLISECONDS);
+            }, seto, TimeUnit.MILLISECONDS));
         }
     }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index 824fb216ed..26ca6f2f4c 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -63,6 +63,7 @@ import java.util.Optional;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import static com.codahale.metrics.MetricRegistry.name;
@@ -266,6 +267,14 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
                 }
             } finally {
                 timerContext.stop();
+
+                // There is a race condition that this query may have finished before the timeoutFuture was created,
+                // though this is very unlikely. This is handled in the settor, if this has already been grabbed.
+                // If we passed this point and the setter hasn't been called, it will cancel the timeoutFuture inside
+                // the setter to compensate.
+                final ScheduledFuture<?> timeoutFuture = context.getTimeoutExecutor();
+                if (null != timeoutFuture)
+                    timeoutFuture.cancel(true);
             }
 
             return null;
@@ -275,12 +284,12 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
             final Future<?> executionFuture = context.getGremlinExecutor().getExecutorService().submit(evalFuture);
             if (seto > 0) {
                 // Schedule a timeout in the thread pool for future execution
-                context.getScheduledExecutorService().schedule(() -> {
+                context.setTimeoutExecutor(context.getScheduledExecutorService().schedule(() -> {
                     executionFuture.cancel(true);
                     if (!context.getStartedResponse()) {
                         context.sendTimeoutResponse();
                     }
-                }, seto, TimeUnit.MILLISECONDS);
+                }, seto, TimeUnit.MILLISECONDS));
             }
         } catch (RejectedExecutionException ree) {
             context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.TOO_MANY_REQUESTS)