You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/12 12:34:29 UTC

[tinkerpop] branch driver-35 updated: Fixed some problems with closing sessions and waiting for handshakes

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/driver-35 by this push:
     new 0fa85ba  Fixed some problems with closing sessions and waiting for handshakes
0fa85ba is described below

commit 0fa85ba179a10ace865ed5bbd1f51f16ab64617a
Author: stephen <sp...@gmail.com>
AuthorDate: Thu Dec 12 07:33:43 2019 -0500

    Fixed some problems with closing sessions and waiting for handshakes
---
 .../apache/tinkerpop/gremlin/driver/Channelizer.java    | 16 +++++++++++-----
 .../org/apache/tinkerpop/gremlin/driver/Client.java     | 13 ++++++++-----
 .../org/apache/tinkerpop/gremlin/driver/Connection.java |  2 +-
 .../gremlin/driver/handler/WebSocketClientHandler.java  |  2 ++
 .../gremlin/server/handler/OpExecutorHandler.java       | 10 ++++++++++
 .../gremlin/server/GremlinServerSslIntegrateTest.java   | 17 +++++++++++------
 6 files changed, 43 insertions(+), 17 deletions(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index f5c5301..b41100a 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -171,21 +171,27 @@ public interface Channelizer extends ChannelHandler {
         public void connected(final Channel ch) {
             try {
                 // be sure the handshake is done - if the handshake takes longer than the specified time there's
-                // gotta be issues with that  server. a common problem where this comes up: SSL is enabled on the
+                // gotta be issues with that server. a common problem where this comes up: SSL is enabled on the
                 // server, but the client forgot to enable it or perhaps the server is not configured for websockets.
-                final ChannelFuture handshakeFuture = ((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture();
+                // we see this because a normal websocket handshake is sent to the server and it can't be decoded by
+                // SSL and no response get sent back so the handshake has no idea what to do. used the
+                // maxWaitForConnection here in the same way it was used as the netty websocket handshaker timeout.
+                // the wait for the overall connection should trigger before this.
+                final ChannelFuture handshakeFuture = ((WebSocketClientHandler) (ch.pipeline().get("ws-client-handler"))).handshakeFuture();
                 if (!handshakeFuture.isDone()) {
                     handshakeFuture.addListener(f -> {
                         if (!f.isSuccess()) {
                             throw new ConnectionException(connectionPool.getHost().getHostUri(),
                                     "Could not complete websocket handshake - ensure that client protocol matches server", f.cause());
                         }
-                    }).sync();
+                    }).get(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
                 }
-            }  catch (InterruptedException ex) {
+            } catch (ExecutionException ex) {
+                throw new RuntimeException(ex);
+            } catch (InterruptedException | TimeoutException ex) {
                 // catching the InterruptedException will reset the interrupted flag. This is intentional.
                 throw new RuntimeException(new ConnectionException(connectionPool.getHost().getHostUri(),
-                                                                   "Timed out while performing websocket handshake - ensure that client protocol matches server", ex.getCause()));
+                      "Timed out while performing websocket handshake - ensure that client protocol matches server", ex.getCause()));
             }
         }
     }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 99342c0..699e6e8 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -779,23 +779,26 @@ public abstract class Client {
             final RequestMessage closeMessage = buildMessage(RequestMessage.build(Tokens.OPS_CLOSE)
                                                                            .addArg(Tokens.ARGS_FORCE, forceClose)).create();
 
-            closing.set(CompletableFuture.supplyAsync(() -> {
+            // need to submit from here because after the future is set to closing we can't send anymore messages.
+            final CompletableFuture<ResultSet> closeRequestFuture = submitAsync(closeMessage);
+
+            closing.set(CompletableFuture.runAsync(() -> {
                 try {
                     // block this up until we get a response from the server or an exception. it might not be accurate
                     // to wait for maxWaitForSessionClose because we wait that long for this future in calls to close()
                     // but in either case we don't want to wait longer than that so perhaps this is still a sensible
                     // wait time - or at least better than something hardcoded. this wait will just expire a bit after
                     // the close() call's expiration....at least i think that's right.
-                    submitAsync(closeMessage).get(
+                    closeRequestFuture.get(
                             cluster.connectionPoolSettings().maxWaitForSessionClose, TimeUnit.MILLISECONDS).all().get();
-                } catch (Exception ignored) {
+                } catch (Exception ex) {
                     // ignored - if the close message doesn't get to the server it's not a real worry. the server will
                     // eventually kill the session
+                    logger.warn("Session close request failed for [" + this.sessionId + "]- the server will close the session once it has been determined to be idle or during shutdown", ex);
                 } finally {
                     connectionPool.closeAsync();
                 }
-                return null;
-            }, cluster.executor()));
+            }));
 
             return closing.get();
         }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index c1a0b5a..417a170 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -37,7 +37,7 @@ import java.util.concurrent.CompletableFuture;
  */
 public interface Connection {
 
-    int DEFAULT_MAX_WAIT_FOR_CONNECTION = 3000;
+    int DEFAULT_MAX_WAIT_FOR_CONNECTION = 30000;
     int DEFAULT_MAX_WAIT_FOR_SESSION_CLOSE = 3000;
     int DEFAULT_MAX_CONTENT_LENGTH = 65536;
     int DEFAULT_RECONNECT_INTERVAL = 1000;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 95f6923..e79944e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -29,6 +29,8 @@ import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.ssl.SslCompletionEvent;
+import io.netty.handler.ssl.SslHandshakeCompletionEvent;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.util.ReferenceCountUtil;
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
index 068c332..60e0177 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.server.handler;
 
+import io.netty.handler.ssl.NotSslRecordException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
@@ -82,4 +83,13 @@ public class OpExecutorHandler extends SimpleChannelInboundHandler<Pair<RequestM
             ReferenceCountUtil.release(objects);
         }
     }
+
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
+        // basic catch all for server exceptions
+        logger.error(cause.getMessage(), cause);
+        if (cause.getCause() instanceof NotSslRecordException) {
+            ctx.close();
+        }
+    }
 }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
index ba562a1..7dfe566 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
@@ -38,6 +38,10 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests we know will fail have Shorten the wait for a connection settings as there is really no need to wait the full
+ * time just to get a failure.
+ */
 public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrationTest {
 
     /**
@@ -122,7 +126,6 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
         }
     }
 
-
     @Test
     public void shouldEnableSsl() {
         final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS).sslSkipCertValidation(true).create();
@@ -156,7 +159,7 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
 
     @Test
     public void shouldEnableSslButFailIfClientConnectsWithoutIt() {
-        final Cluster cluster = TestClientFactory.build().enableSsl(false).create();
+        final Cluster cluster = TestClientFactory.build().enableSsl(false).maxWaitForConnection(3000).create();
         final Client client = cluster.connect();
 
         try {
@@ -199,7 +202,7 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
     @Test
     public void shouldEnableSslAndClientCertificateAuthAndFailWithoutCert() {
         final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS)
-                .keyStoreType(KEYSTORE_TYPE_JKS).sslSkipCertValidation(true).create();
+                .keyStoreType(KEYSTORE_TYPE_JKS).sslSkipCertValidation(true).maxWaitForConnection(3000).create();
         final Client client = cluster.connect();
 
         try {
@@ -216,7 +219,8 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
     @Test
     public void shouldEnableSslAndClientCertificateAuthAndFailWithoutTrustedClientCert() {
         final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_CLIENT_KEY).keyStorePassword(KEY_PASS)
-                .keyStoreType(KEYSTORE_TYPE_JKS).trustStore(JKS_CLIENT_TRUST).trustStorePassword(KEY_PASS).create();
+                .keyStoreType(KEYSTORE_TYPE_JKS).trustStore(JKS_CLIENT_TRUST).trustStorePassword(KEY_PASS)
+                .maxWaitForConnection(3000).create();
         final Client client = cluster.connect();
 
         try {
@@ -233,7 +237,7 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
     @Test
     public void shouldEnableSslAndFailIfProtocolsDontMatch() {
         final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS)
-                .sslSkipCertValidation(true).sslEnabledProtocols(Arrays.asList("TLSv1.2")).create();
+                .sslSkipCertValidation(true).sslEnabledProtocols(Arrays.asList("TLSv1.2")).maxWaitForConnection(3000).create();
         final Client client = cluster.connect();
 
         try {
@@ -250,7 +254,8 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
     @Test
     public void shouldEnableSslAndFailIfCiphersDontMatch() {
         final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS)
-                .sslSkipCertValidation(true).sslCipherSuites(Arrays.asList("SSL_RSA_WITH_RC4_128_SHA")).create();
+                .sslSkipCertValidation(true).sslCipherSuites(Arrays.asList("SSL_RSA_WITH_RC4_128_SHA"))
+                .maxWaitForConnection(3000).create();
         final Client client = cluster.connect();
 
         try {