You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/09/18 21:26:14 UTC

[2/2] incubator-tinkerpop git commit: Do a better job closing down sessions in Gremlin Server.

Do a better job closing down sessions in Gremlin Server.

Gremlin Server was relying on a scheduled event to close sessions and thus close any open transactions for that session.  Of course, shutting down gremlin server apparently did't allow those scheduled jobs to complete on gremlin server shutdown.  Hooked into the shutdown of Gremlin Server and directly closed the sessions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0d3dfa66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0d3dfa66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0d3dfa66

Branch: refs/heads/tp30
Commit: 0d3dfa66336927a59144c3dac17f9cf2d1538f7c
Parents: 599f8a9
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri Sep 18 12:24:20 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Sep 18 12:24:20 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../tinkerpop/gremlin/server/GremlinServer.java | 11 ++++++
 .../tinkerpop/gremlin/server/OpProcessor.java   |  4 +-
 .../tinkerpop/gremlin/server/op/OpLoader.java   | 14 ++++++-
 .../server/op/control/ControlOpProcessor.java   |  6 +++
 .../gremlin/server/op/session/Session.java      | 40 +++++++++++++-------
 .../server/op/session/SessionOpProcessor.java   |  6 +++
 .../server/op/standard/StandardOpProcessor.java |  6 +++
 8 files changed, 73 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d3dfa66/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 60c555d..d30fad5 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,6 +25,7 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.0.2 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Improved session closing for transactional graphs during shutdown of Gremlin Server.
 * Fixed id parameter used in tests for `GroovyStoreTest` and `GroovyRepeatTest` to not be treated as an embedded string.
 * `GraphStep` will convert any `Vertex` or `Edge` ids to their id `Object` prior to submission to `GraphComputer` (OLAP).
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d3dfa66/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
index 3248ce0..b3330b9 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import org.apache.tinkerpop.gremlin.server.op.OpLoader;
 import org.apache.tinkerpop.gremlin.server.util.LifeCycleHook;
 import org.apache.tinkerpop.gremlin.server.util.MetricManager;
 import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
@@ -200,6 +201,16 @@ public class GremlinServer {
         serverStopped = new CompletableFuture<>();
         final CountDownLatch servicesLeftToShutdown = new CountDownLatch(3);
 
+        // release resources in the OpProcessors (e.g. kill sessions)
+        OpLoader.getProcessors().entrySet().forEach(kv -> {
+            logger.info("Shutting down OpProcessor[{}]", kv.getKey());
+            try {
+                kv.getValue().close();
+            } catch (Exception ex) {
+                logger.warn("Shutdown will continue but, there was an error encountered while closing " + kv.getKey(), ex);
+            }
+        });
+
         // it's possible that a channel might not be initialized in the first place if bind() fails because
         // of port conflict.  in that case, there's no need to wait for the channel to close.
         if (null == ch)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d3dfa66/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/OpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/OpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/OpProcessor.java
index 57e560d..27875ac 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/OpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/OpProcessor.java
@@ -21,12 +21,14 @@ package org.apache.tinkerpop.gremlin.server;
 import org.apache.tinkerpop.gremlin.server.op.OpProcessorException;
 import org.apache.tinkerpop.gremlin.util.function.ThrowingConsumer;
 
+import java.io.Closeable;
+
 /**
  * Interface for providing commands that websocket requests will respond to.
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public interface OpProcessor {
+public interface OpProcessor extends AutoCloseable {
 
     /**
      * The name of the processor which requests must refer to "processor" field on a request.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d3dfa66/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/OpLoader.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/OpLoader.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/OpLoader.java
index 166aead..96b415c 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/OpLoader.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/OpLoader.java
@@ -23,13 +23,14 @@ import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.ServiceLoader;
 
 /**
- * Uses {@code ServiceLoader} to load {@link OpProcessor} instances into a cache.
+ * Uses {@link ServiceLoader} to load {@link OpProcessor} instances into a cache.
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
@@ -49,7 +50,18 @@ public final class OpLoader {
         });
     }
 
+    /**
+     * Gets an {@link OpProcessor} by its name. If it cannot be found an {@link Optional#EMPTY} is returned.
+     */
     public static Optional<OpProcessor> getProcessor(final String name) {
         return Optional.ofNullable(processors.get(name));
     }
+
+    /**
+     * Gets a read-only map of the processors where the key is the {@link OpProcessor} name and the value is the
+     * instance created by {@link ServiceLoader}.
+     */
+    public static Map<String, OpProcessor> getProcessors() {
+        return Collections.unmodifiableMap(processors);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d3dfa66/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/control/ControlOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/control/ControlOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/control/ControlOpProcessor.java
index db57c8c..81807e9 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/control/ControlOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/control/ControlOpProcessor.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.util.function.ThrowingConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -86,6 +87,11 @@ public class ControlOpProcessor implements OpProcessor {
         return op;
     }
 
+    @Override
+    public void close() throws Exception {
+        // do nothing = no resources to release
+    }
+
     private static Optional<ThrowingConsumer<Context>> validateImportMessage(final RequestMessage message) throws OpProcessorException {
         final Optional<List> l = message.optionalArgs(Tokens.ARGS_IMPORTS);
         if (!l.isPresent()) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d3dfa66/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
index e998c3a..f871301 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
@@ -104,23 +104,37 @@ public class Session {
         if (null == killFuture || !killFuture.isDone()) {
             if (killFuture != null) killFuture.cancel(false);
             kill.set(this.scheduledExecutorService.schedule(() -> {
-                // when the session is killed open transaction should be rolled back
-                graphManager.getGraphs().values().forEach(g -> {
-                    if (g.features().graph().supportsTransactions()) {
-                        // have to execute the rollback in the executor because the transaction is associated with
-                        // that thread of execution from this session
-                        this.executor.execute(() -> {
-                            logger.info("Rolling back any open transactions before killing idle session: {}", this.session);
-                            if (g.tx().isOpen()) g.tx().rollback();
-                        });
-                    }
-                });
-                sessions.remove(this.session);
-                logger.info("Kill idle session named {} after {} milliseconds", this.session, this.configuredSessionTimeout);
+                logger.info("Session {} has been idle for more than {} milliseconds - preparing to close",
+                        this.session, this.configuredSessionTimeout);
+                kill();
             }, this.configuredSessionTimeout, TimeUnit.MILLISECONDS));
         }
     }
 
+    /**
+     * Kills the session and rollback any uncommitted changes on transactional graphs.
+     */
+    public void kill() {
+        // when the session is killed open transaction should be rolled back
+        graphManager.getGraphs().entrySet().forEach(kv -> {
+            final Graph g = kv.getValue();
+            if (g.features().graph().supportsTransactions()) {
+                // have to execute the rollback in the executor because the transaction is associated with
+                // that thread of execution from this session
+                try {
+                    executor.submit(() -> {
+                        logger.info("Rolling back open transactions on {} before killing session: {}", kv.getKey(), session);
+                        if (g.tx().isOpen()) g.tx().rollback();
+                    }).get(30000, TimeUnit.MILLISECONDS);
+                } catch (Exception ex) {
+                    logger.warn("An error occurred while attempting rollback when closing session: " + session, ex);
+                }
+            }
+        });
+        sessions.remove(session);
+        logger.info("Session {} closed", session);
+    }
+
     private GremlinExecutor.Builder initializeGremlinExecutor() {
         final GremlinExecutor.Builder gremlinExecutorBuilder = GremlinExecutor.build()
                 .scriptEvaluationTimeout(settings.scriptEvaluationTimeout)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d3dfa66/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
index 9c256f2..bdeb02a 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.script.Bindings;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -105,6 +106,11 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
         return Optional.empty();
     }
 
+    @Override
+    public void close() throws Exception {
+       sessions.values().forEach(Session::kill);
+    }
+
     protected void evalOp(final Context context) throws OpProcessorException {
         final RequestMessage msg = context.getRequestMessage();
         final Session session = getSession(context, msg);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0d3dfa66/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
index 1e0bc03..42cf649 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/standard/StandardOpProcessor.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.script.Bindings;
 import javax.script.SimpleBindings;
+import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
 
@@ -61,6 +62,11 @@ public class StandardOpProcessor extends AbstractEvalOpProcessor {
         return this::evalOp;
     }
 
+    @Override
+    public void close() throws Exception {
+        // do nothing = no resources to release
+    }
+
     private void evalOp(final Context context) throws OpProcessorException {
         final RequestMessage msg = context.getRequestMessage();