You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ly...@apache.org on 2023/06/12 18:19:07 UTC

[tinkerpop] branch TINKERPOP-2958 updated: TINKERPOP-2958 Added cancel for query timeout task when query completes.

This is an automated email from the ASF dual-hosted git repository.

lyndonb pushed a commit to branch TINKERPOP-2958
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/TINKERPOP-2958 by this push:
     new 21eda2074c TINKERPOP-2958 Added cancel for query timeout task when query completes.
21eda2074c is described below

commit 21eda2074c753d73df9a3839ba54b300b9a6864c
Author: lyndon <ly...@apache.org>
AuthorDate: Mon Jun 12 11:18:57 2023 -0700

    TINKERPOP-2958 Added cancel for query timeout task when query completes.
---
 CHANGELOG.asciidoc                                 |  1 +
 .../apache/tinkerpop/gremlin/server/Context.java   | 23 ++++++++++++++++++++++
 .../server/op/session/SessionOpProcessor.java      | 13 ++++++++++--
 .../server/op/traversal/TraversalOpProcessor.java  | 11 +++++++++--
 4 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 14b1e8f724..07122d2b70 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -29,6 +29,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Upgraded `gremlin-go` to Go 1.20.
 * Improved the python Translator class
 * Added `gremlin-java8.bat` file as a workaround to allow loading the console using Java 8 on Windows.
+* Fixed a bug in `gremlin-server` where timeout tasks were not cancelled and could cause very large memory usage when timeout is large.
 
 [[release-3-5-6]]
 === TinkerPop 3.5.6 (Release Date: May 1, 2023)
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index ad15772509..38f2f25966 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -55,6 +56,9 @@ public class Context {
     private final RequestContentType requestContentType;
     private final Object gremlinArgument;
     private final AtomicBoolean startedResponse = new AtomicBoolean(false);
+    private ScheduledFuture<?> timeoutExecutor = null;
+    private boolean timeoutExecutorGrabbed = false;
+    private final Object timeoutExecutorLock = new Object();
 
     /**
      * The type of the request as determined by the contents of {@link Tokens#ARGS_GREMLIN}.
@@ -92,6 +96,25 @@ public class Context {
         this.requestTimeout = determineTimeout();
     }
 
+    public void setTimeoutExecutor(final ScheduledFuture<?> timeoutExecutor) {
+        synchronized (timeoutExecutorLock) {
+            this.timeoutExecutor = timeoutExecutor;
+
+            // Timeout was grabbed before we got here, this means the query executed before the timeout was created.
+            if (timeoutExecutorGrabbed) {
+                // Cancel the timeout.
+                this.timeoutExecutor.cancel(true);
+            }
+        }
+    }
+
+    public ScheduledFuture<?> getTimeoutExecutor() {
+        synchronized (timeoutExecutorLock) {
+            timeoutExecutorGrabbed = true;
+            return this.timeoutExecutor;
+        }
+    }
+
     /**
      * The timeout for the request. If the request is a script it examines the script for a timeout setting using
      * {@code with()}. If that is not found then it examines the request itself to see if the timeout is provided by
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
index 4077ffdb11..3d17786be9 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
@@ -65,6 +65,7 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -466,6 +467,14 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
             } finally {
                 // todo: timer matter???
                 //timerContext.stop();
+
+                // There is a race condition that this query may have finished before the timeoutFuture was created,
+                // though this is very unlikely. This is handled in the settor, if this has already been grabbed.
+                // If we passed this point and the setter hasn't been called, it will cancel the timeoutFuture inside
+                // the setter to compensate.
+                final ScheduledFuture<?> timeoutFuture = context.getTimeoutExecutor();
+                if (null != timeoutFuture)
+                    timeoutFuture.cancel(true);
             }
 
             return null;
@@ -479,12 +488,12 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
         final Future<?> executionFuture = session.getGremlinExecutor().getExecutorService().submit(evalFuture);
         if (seto > 0) {
             // Schedule a timeout in the thread pool for future execution
-            context.getScheduledExecutorService().schedule(() -> {
+            context.setTimeoutExecutor(context.getScheduledExecutorService().schedule(() -> {
                 executionFuture.cancel(true);
                 if (!context.getStartedResponse()) {
                     context.sendTimeoutResponse();
                 }
-            }, seto, TimeUnit.MILLISECONDS);
+            }, seto, TimeUnit.MILLISECONDS));
         }
     }
 
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index 824fb216ed..598e8ab476 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -63,6 +63,7 @@ import java.util.Optional;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import static com.codahale.metrics.MetricRegistry.name;
@@ -266,6 +267,12 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
                 }
             } finally {
                 timerContext.stop();
+
+                // There is a race condition that this query may have finished before the timeoutFuture was created,
+                // though this is very unlikely. Either way we don't want a NPE.
+                final ScheduledFuture<?> timeoutFuture = context.getTimeoutExecutor();
+                if (null != timeoutFuture)
+                    timeoutFuture.cancel(true);
             }
 
             return null;
@@ -275,12 +282,12 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
             final Future<?> executionFuture = context.getGremlinExecutor().getExecutorService().submit(evalFuture);
             if (seto > 0) {
                 // Schedule a timeout in the thread pool for future execution
-                context.getScheduledExecutorService().schedule(() -> {
+                context.setTimeoutExecutor(context.getScheduledExecutorService().schedule(() -> {
                     executionFuture.cancel(true);
                     if (!context.getStartedResponse()) {
                         context.sendTimeoutResponse();
                     }
-                }, seto, TimeUnit.MILLISECONDS);
+                }, seto, TimeUnit.MILLISECONDS));
             }
         } catch (RejectedExecutionException ree) {
             context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.TOO_MANY_REQUESTS)