You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2020/02/11 20:06:31 UTC

[tinkerpop] 01/01: TINKERPOP-2336 Added getMaxWaitForClose java driver setting.

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2336
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 4bd6a47abf9889e420427a660fe71a3f7866f78b
Author: Stephen Mallette <sp...@genoprime.com>
AuthorDate: Tue Feb 11 08:18:24 2020 -0500

    TINKERPOP-2336 Added getMaxWaitForClose java driver setting.
    
    Deprecated the "close" message for sessions along with its driver setting. The driver should stay compatible with older server versions, but the server now closes sessions with the close of the connection and there no longer is a block while all messages return from the server after a Client.close().
---
 CHANGELOG.asciidoc                                 |  3 ++
 docs/src/dev/provider/index.asciidoc               |  6 ++++
 docs/src/upgrade/release-3.3.x.asciidoc            | 27 +++++++++++++++++
 .../apache/tinkerpop/gremlin/driver/Cluster.java   | 34 ++++++++++++++++++++++
 .../tinkerpop/gremlin/driver/Connection.java       | 22 ++++++++++----
 .../apache/tinkerpop/gremlin/driver/Settings.java  | 16 ++++++++++
 .../gremlin/server/op/session/Session.java         | 12 ++++++++
 .../server/op/session/SessionOpProcessor.java      |  5 +++-
 .../server/GremlinServerSessionIntegrateTest.java  | 14 ++++++++-
 9 files changed, 131 insertions(+), 8 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 2419b52..8d2523c 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -24,6 +24,9 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 [[release-3-3-11]]
 === TinkerPop 3.3.11 (Release Date: NOT OFFICIALLY RELEASED YET)
 
+* Modified Gremlin Server to close the session when the channel itself is closed.
+* Added `maxWaitForClose` configuration option to the Java driver.
+* Deprecated `maxWaitForSessionClose` in the Java driver.
 
 [[release-3-3-10]]
 === TinkerPop 3.3.10 (Release Date: February 3, 2020)
diff --git a/docs/src/dev/provider/index.asciidoc b/docs/src/dev/provider/index.asciidoc
index 1c4808c..ad16f76 100644
--- a/docs/src/dev/provider/index.asciidoc
+++ b/docs/src/dev/provider/index.asciidoc
@@ -1061,6 +1061,12 @@ Gremlin Server are deployed, session state is not shared among them.
 !`close` !Close the specified session. Will return a `NO CONTENT` message as confirmation of the close being completed.
 |=========================================================
 
+NOTE: The "close" message is deprecated as of 3.3.11 as servers at this version are required to automatically interrupt
+running processes on the close of the connection and release resources such as sessions. Servers wishing to be
+compatible with older versions of the driver need only send back a `NO_CONTENT` for this message. Drivers wishing to
+be compatible with servers prior to 3.3.11 may continue to send the message on calls to `close()` otherwise such code
+can be removed.
+
 **`authentication` operation arguments**
 
 [width="100%",cols="2,2,9",options="header"]
diff --git a/docs/src/upgrade/release-3.3.x.asciidoc b/docs/src/upgrade/release-3.3.x.asciidoc
index 989acde..97ea4e3 100644
--- a/docs/src/upgrade/release-3.3.x.asciidoc
+++ b/docs/src/upgrade/release-3.3.x.asciidoc
@@ -29,6 +29,33 @@ Please see the link:https://github.com/apache/tinkerpop/blob/3.3.11/CHANGELOG.as
 
 === Upgrading for Users
 
+==== Deprecate maxWaitForSessionClose
+
+The `maxWaitForSessionClose` setting for the Java driver has been deprecated and in some sense replaced by the
+`maxWaitForClose` setting. The two settings perform different functions, but expect `maxWaitForSessionClose` to be
+removed in future versions. The `maxWaitForClose` performs a more useful function than `maxWaitForSessionClose` in
+the sense that it tells the driver how long it should wait for pending messages from the server before closing the
+connection. The `maxWaitForSessionClose` on the other hand is how long the driver should wait for the server to
+respond to a session close message. Future versions will remove support for that particular message and simply close
+the session when the connection is closed as a result that setting will no longer be useful. The old setting is really
+only useful for connecting to older versions of the server prior to 3.3.11 that do not have the session shutdown hook
+bound to the close of the connection.
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-2336[TINKERPOP-2336]
+
+=== Upgrading for Providers
+
+==== Gremlin Driver Providers
+
+===== Session Close
+
+The "close" message for the `SessionOpProcessor` is deprecated, however the functionality to accept the message remains
+in Gremlin Server and the functionality to send the message remains in the Java driver. The expectation is that
+support for the message will be removed from the driver in a future release, likely at 3.5.0. Server implementations
+starting at 3.3.11 should look to use the close of a connection to trigger the close of a session and its release of
+resources.
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-2336[TINKERPOP-2336]
 
 == TinkerPop 3.3.10
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 253072f..121e5c0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -398,12 +398,21 @@ public final class Cluster {
 
     /**
      * Gets how long a session will stay open assuming the current connection actually is configured for their use.
+     *
+     * @deprecated As of release 3.3.11, replaced in essence by {@link #getMaxWaitForClose()}.
      */
     public int getMaxWaitForSessionClose() {
         return manager.connectionPoolSettings.maxWaitForSessionClose;
     }
 
     /**
+     * Gets how long a connection will wait for all pending messages to be returned from the server before closing.
+     */
+    public int getMaxWaitForClose() {
+        return manager.connectionPoolSettings.maxWaitForClose;
+    }
+
+    /**
      * Gets the maximum size in bytes of any request sent to the server.
      */
     public int getMaxContentLength() {
@@ -576,6 +585,7 @@ public final class Cluster {
         private int minInProcessPerConnection = Connection.MIN_IN_PROCESS;
         private int maxWaitForConnection = Connection.MAX_WAIT_FOR_CONNECTION;
         private int maxWaitForSessionClose = Connection.MAX_WAIT_FOR_SESSION_CLOSE;
+        private int maxWaitForClose = Connection.MAX_WAIT_FOR_CLOSE;
         private int maxContentLength = Connection.MAX_CONTENT_LENGTH;
         private int reconnectInterval = Connection.RECONNECT_INTERVAL;
         private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
@@ -883,13 +893,33 @@ public final class Cluster {
          * If the connection is using a "session" this setting represents the amount of time in milliseconds to wait
          * for that session to close before timing out where the default value is 3000. Note that the server will
          * eventually clean up dead sessions itself on expiration of the session or during shutdown.
+         *
+         * @deprecated As of release 3.3.11, replaced in essence by {@link #maxWaitForClose(int)} though behavior
+         * described here is still maintained.
          */
+        @Deprecated
         public Builder maxWaitForSessionClose(final int maxWait) {
             this.maxWaitForSessionClose = maxWait;
             return this;
         }
 
         /**
+         * The amount of time in milliseconds to wait the connection to close before timing out where the default
+         * value is 3000. This timeout allows for a delay to occur in waiting for remaining messages that may still
+         * be returning from the server while a {@link Client#close()} is called.
+         * <p/>
+         * This setting is related to {@link #maxWaitForSessionClose} to some degree. This setting refers to a wait
+         * for standard requests (i.e. queries) but the {@link #maxWaitForSessionClose} refers to a wait for the
+         * "session close" message that occurs after all standard requests have returned (or timed out). There is
+         * generally no need to set {@link #maxWaitForSessionClose} if the server is on 3.3.11 or later as the close
+         * of the connection will trigger the close of the session and the release of resources.
+         */
+        public Builder maxWaitForClose(final int maxWait) {
+            this.maxWaitForSessionClose = maxWait;
+            return this;
+        }
+
+        /**
          * The maximum size in bytes of any request sent to the server.   This number should not exceed the same
          * setting defined on the server.
          */
@@ -1074,6 +1104,7 @@ public final class Cluster {
             connectionPoolSettings.minSize = builder.minConnectionPoolSize;
             connectionPoolSettings.maxWaitForConnection = builder.maxWaitForConnection;
             connectionPoolSettings.maxWaitForSessionClose = builder.maxWaitForSessionClose;
+            connectionPoolSettings.maxWaitForClose = builder.maxWaitForClose;
             connectionPoolSettings.maxContentLength = builder.maxContentLength;
             connectionPoolSettings.reconnectInterval = builder.reconnectInterval;
             connectionPoolSettings.resultIterationBatchSize = builder.resultIterationBatchSize;
@@ -1145,6 +1176,9 @@ public final class Cluster {
             if (builder.maxWaitForSessionClose < 1)
                 throw new IllegalArgumentException("maxWaitForSessionClose must be greater than zero");
 
+            if (builder.maxWaitForClose < 1)
+                throw new IllegalArgumentException("maxWaitForClose must be greater than zero");
+
             if (builder.maxContentLength < 1)
                 throw new IllegalArgumentException("maxContentLength must be greater than zero");
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index ba18ddb..3620fbd 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -63,6 +63,7 @@ final class Connection {
     public static final int MIN_IN_PROCESS = 1;
     public static final int MAX_WAIT_FOR_CONNECTION = 3000;
     public static final int MAX_WAIT_FOR_SESSION_CLOSE = 3000;
+    public static final int MAX_WAIT_FOR_CLOSE = 3000;
     public static final int MAX_CONTENT_LENGTH = 65536;
 
     public static final int RECONNECT_INTERVAL = 1000;
@@ -184,7 +185,7 @@ final class Connection {
                 shutdown(future);
         } else {
             // there may be some pending requests. schedule a job to wait for those to complete and then shutdown
-            new CheckForPending(future).runUntilDone(cluster.executor(), 1000, TimeUnit.MILLISECONDS);
+            new CheckForPending(future).runUntilDone(cluster.executor());
         }
 
         return future;
@@ -320,7 +321,14 @@ final class Connection {
         if (shutdownInitiated.compareAndSet(false, true)) {
             final String connectionInfo = this.getConnectionInfo();
 
-            // maybe this should be delegated back to the Client implementation??? kinda weird to instanceof here.....
+            // this block of code that "closes" the session is deprecated as of 3.3.11 - this message is going to be
+            // removed at 3.5.0. we will instead bind session closing to the close of the channel itself and not have
+            // this secondary operation here which really only acts as a means for clearing resources in a functioning
+            // session. "functioning" in this context means that the session is not locked up with a long running
+            // operation which will delay this close execution which ideally should be more immediate, as in the user
+            // is annoyed that a long run operation is happening and they want an immediate cancellation. that's the
+            // most likely use case. we also get the nice benefit that this if/then code just goes away as the
+            // Connection really shouldn't care about the specific Client implementation.
             if (client instanceof Client.SessionedClient) {
                 final boolean forceClose = client.getSettings().getSession().get().isForceClosed();
                 final RequestMessage closeMessage = client.buildMessage(
@@ -333,7 +341,7 @@ final class Connection {
                     // make sure we get a response here to validate that things closed as expected.  on error, we'll let
                     // the server try to clean up on its own.  the primary error here should probably be related to
                     // protocol issues which should not be something a user has to fuss with.
-                    closed.join().all().get(cluster.connectionPoolSettings().maxWaitForSessionClose, TimeUnit.MILLISECONDS);
+                    closed.join().all().get(cluster.getMaxWaitForSessionClose(), TimeUnit.MILLISECONDS);
                 } catch (TimeoutException ex) {
                     final String msg = String.format(
                             "Timeout while trying to close connection on %s - force closing - server will close session on shutdown or expiration.",
@@ -382,16 +390,18 @@ final class Connection {
     private final class CheckForPending implements Runnable {
         private volatile ScheduledFuture<?> self;
         private final CompletableFuture<Void> future;
+        private long checkUntil = System.currentTimeMillis();
 
         CheckForPending(final CompletableFuture<Void> future) {
             this.future = future;
+            checkUntil = checkUntil + cluster.getMaxWaitForClose();
         }
 
         @Override
         public void run() {
             logger.info("Checking for pending messages to complete before close on {}", this);
 
-            if (isOkToClose()) {
+            if (isOkToClose() || System.currentTimeMillis() > checkUntil) {
                 shutdown(future);
                 boolean interrupted = false;
                 try {
@@ -411,8 +421,8 @@ final class Connection {
             }
         }
 
-        void runUntilDone(final ScheduledExecutorService executor, final long period, final TimeUnit unit) {
-            self = executor.scheduleAtFixedRate(this, period, period, unit);
+        void runUntilDone(final ScheduledExecutorService executor) {
+            self = executor.scheduleAtFixedRate(this, 1000, 1000, TimeUnit.MILLISECONDS);
         }
     }
 }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 2ba8cfc..e60de22 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -233,6 +233,12 @@ final class Settings {
             if (connectionPoolConf.containsKey("maxWaitForConnection"))
                 cpSettings.maxWaitForConnection = connectionPoolConf.getInt("maxWaitForConnection");
 
+            if (connectionPoolConf.containsKey("maxWaitForSessionClose"))
+                cpSettings.maxWaitForSessionClose = connectionPoolConf.getInt("maxWaitForSessionClose");
+
+            if (connectionPoolConf.containsKey("maxWaitForClose"))
+                cpSettings.maxWaitForClose = connectionPoolConf.getInt("maxWaitForClose");
+
             if (connectionPoolConf.containsKey("maxContentLength"))
                 cpSettings.maxContentLength = connectionPoolConf.getInt("maxContentLength");
 
@@ -389,10 +395,20 @@ final class Settings {
          * If the connection is using a "session" this setting represents the amount of time in milliseconds to wait
          * for that session to close before timing out where the default value is 3000. Note that the server will
          * eventually clean up dead sessions itself on expiration of the session or during shutdown.
+         *
+         * @deprecated As of release 3.3.11, replaced in essence by {@link #maxWaitForClose}.
          */
+        @Deprecated
         public int maxWaitForSessionClose = Connection.MAX_WAIT_FOR_SESSION_CLOSE;
 
         /**
+         * The amount of time in milliseconds to wait the connection to close before timing out where the default
+         * value is 3000. This timeout allows for a delay to occur in waiting for remaining messages that may still
+         * be returning from the server while a {@link Client#close()} is called.
+         */
+        public int maxWaitForClose = Connection.MAX_WAIT_FOR_CLOSE;
+
+        /**
          * The maximum length in bytes that a message can be sent to the server. This number can be no greater than
          * the setting of the same name in the server configuration. The default value is 65536.
          */
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
index f41a0fb..24d799b 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.server.op.session;
 
+import io.netty.channel.Channel;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
 import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
 import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
@@ -68,6 +69,7 @@ public class Session {
     private final long configuredSessionTimeout;
     private final long configuredPerGraphCloseTimeout;
     private final boolean globalFunctionCacheEnabled;
+    private final Channel boundChannel;
 
     private AtomicBoolean killing = new AtomicBoolean(false);
     private AtomicReference<ScheduledFuture> kill = new AtomicReference<>();
@@ -110,6 +112,16 @@ public class Session {
         this.gremlinExecutor = initializeGremlinExecutor().create();
 
         settings.scriptEngines.keySet().forEach(this::registerMetrics);
+
+        boundChannel = context.getChannelHandlerContext().channel();
+        boundChannel.closeFuture().addListener(future -> manualKill(true));
+    }
+
+    /**
+     * Determines if the supplied {@code Channel} object is the same as the one bound to the {@code Session}.
+     */
+    public boolean isBoundTo(final Channel channel) {
+        return channel == boundChannel;
     }
 
     public GremlinExecutor getGremlinExecutor() {
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
index 8d2edd1..3752f48 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
@@ -124,10 +124,13 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor {
     /**
      * Session based requests accept a "close" operator in addition to "eval". A close will trigger the session to be
      * killed and any uncommitted transaction to be rolled-back.
-     * @return
      */
     @Override
     public Optional<ThrowingConsumer<Context>> selectOther(final RequestMessage requestMessage) throws OpProcessorException {
+        // deprecated the "close" message at 3.3.11 - should probably leave this check for the "close" token so that
+        // if older versions of the driver connect they won't get an error. can basically just write back a NO_CONTENT
+        // for the immediate term in 3.5.0 and then for some future version remove support for the message completely
+        // and thus disallow older driver versions from connecting at all.
         if (requestMessage.getOp().equals(Tokens.OPS_CLOSE)) {
             // this must be an in-session request
             if (!requestMessage.optionalArgs(Tokens.ARGS_SESSION).isPresent()) {
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index 07944e8..9da8cfd 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -168,6 +168,12 @@ public class GremlinServerSessionIntegrateTest  extends AbstractGremlinServerInt
         // basically, we need one to submit the long run job and one to do the close operation that will cancel the
         // long run job. it is probably possible to do this with some low-level message manipulation but that's
         // probably not necessary
+        //
+        // this test wont work so well once we remove the sending of the session close message from the driver which
+        // got deprecated at 3.3.11 and lock a session to the connection that created it. in that case, two Client
+        // instances won't be able to connect to the same session which is what is happening below. not sure what
+        // form this test should take then especially since transactions will force close when the channel closes.
+        // perhaps it should just be removed.
         final Cluster cluster1 = TestClientFactory.open();
         final Client client1 = cluster1.connect(name.getMethodName());
         client1.submit("graph.addVertex()").all().join();
@@ -201,6 +207,12 @@ public class GremlinServerSessionIntegrateTest  extends AbstractGremlinServerInt
         // basically, we need one to submit the long run job and one to do the close operation that will cancel the
         // long run job. it is probably possible to do this with some low-level message manipulation but that's
         // probably not necessary
+        //
+        // this test wont work so well once we remove the sending of the session close message from the driver which
+        // got deprecated at 3.3.11 and lock a session to the connection that created it. in that case, two Client
+        // instances won't be able to connect to the same session which is what is happening below. not sure what
+        // form this test should take then especially since transactions will force close when the channel closes.
+        // perhaps it should just be removed.
         final Cluster cluster1 = TestClientFactory.open();
         final Client client1 = cluster1.connect(name.getMethodName());
         client1.submit("graph.addVertex()").all().join();
@@ -320,7 +332,7 @@ public class GremlinServerSessionIntegrateTest  extends AbstractGremlinServerInt
             cluster.close();
         }
 
-        // there will be on for the timeout and a second for closing the cluster
+        // there will be one for the timeout and a second for closing the cluster
         assertEquals(2, recordingAppender.getMessages().stream()
                 .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
     }