You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/12 12:34:29 UTC
[tinkerpop] branch driver-35 updated: Fixed some problems with
closing sessions and waiting for handshakes
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/driver-35 by this push:
new 0fa85ba Fixed some problems with closing sessions and waiting for handshakes
0fa85ba is described below
commit 0fa85ba179a10ace865ed5bbd1f51f16ab64617a
Author: stephen <sp...@gmail.com>
AuthorDate: Thu Dec 12 07:33:43 2019 -0500
Fixed some problems with closing sessions and waiting for handshakes
---
.../apache/tinkerpop/gremlin/driver/Channelizer.java | 16 +++++++++++-----
.../org/apache/tinkerpop/gremlin/driver/Client.java | 13 ++++++++-----
.../org/apache/tinkerpop/gremlin/driver/Connection.java | 2 +-
.../gremlin/driver/handler/WebSocketClientHandler.java | 2 ++
.../gremlin/server/handler/OpExecutorHandler.java | 10 ++++++++++
.../gremlin/server/GremlinServerSslIntegrateTest.java | 17 +++++++++++------
6 files changed, 43 insertions(+), 17 deletions(-)
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index f5c5301..b41100a 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -171,21 +171,27 @@ public interface Channelizer extends ChannelHandler {
public void connected(final Channel ch) {
try {
// be sure the handshake is done - if the handshake takes longer than the specified time there's
- // gotta be issues with that server. a common problem where this comes up: SSL is enabled on the
+ // gotta be issues with that server. a common problem where this comes up: SSL is enabled on the
// server, but the client forgot to enable it or perhaps the server is not configured for websockets.
- final ChannelFuture handshakeFuture = ((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture();
+ // we see this because a normal websocket handshake is sent to the server and it can't be decoded by
+ // SSL and no response get sent back so the handshake has no idea what to do. used the
+ // maxWaitForConnection here in the same way it was used as the netty websocket handshaker timeout.
+ // the wait for the overall connection should trigger before this.
+ final ChannelFuture handshakeFuture = ((WebSocketClientHandler) (ch.pipeline().get("ws-client-handler"))).handshakeFuture();
if (!handshakeFuture.isDone()) {
handshakeFuture.addListener(f -> {
if (!f.isSuccess()) {
throw new ConnectionException(connectionPool.getHost().getHostUri(),
"Could not complete websocket handshake - ensure that client protocol matches server", f.cause());
}
- }).sync();
+ }).get(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
}
- } catch (InterruptedException ex) {
+ } catch (ExecutionException ex) {
+ throw new RuntimeException(ex);
+ } catch (InterruptedException | TimeoutException ex) {
// catching the InterruptedException will reset the interrupted flag. This is intentional.
throw new RuntimeException(new ConnectionException(connectionPool.getHost().getHostUri(),
- "Timed out while performing websocket handshake - ensure that client protocol matches server", ex.getCause()));
+ "Timed out while performing websocket handshake - ensure that client protocol matches server", ex.getCause()));
}
}
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 99342c0..699e6e8 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -779,23 +779,26 @@ public abstract class Client {
final RequestMessage closeMessage = buildMessage(RequestMessage.build(Tokens.OPS_CLOSE)
.addArg(Tokens.ARGS_FORCE, forceClose)).create();
- closing.set(CompletableFuture.supplyAsync(() -> {
+ // need to submit from here because after the future is set to closing we can't send anymore messages.
+ final CompletableFuture<ResultSet> closeRequestFuture = submitAsync(closeMessage);
+
+ closing.set(CompletableFuture.runAsync(() -> {
try {
// block this up until we get a response from the server or an exception. it might not be accurate
// to wait for maxWaitForSessionClose because we wait that long for this future in calls to close()
// but in either case we don't want to wait longer than that so perhaps this is still a sensible
// wait time - or at least better than something hardcoded. this wait will just expire a bit after
// the close() call's expiration....at least i think that's right.
- submitAsync(closeMessage).get(
+ closeRequestFuture.get(
cluster.connectionPoolSettings().maxWaitForSessionClose, TimeUnit.MILLISECONDS).all().get();
- } catch (Exception ignored) {
+ } catch (Exception ex) {
// ignored - if the close message doesn't get to the server it's not a real worry. the server will
// eventually kill the session
+ logger.warn("Session close request failed for [" + this.sessionId + "]- the server will close the session once it has been determined to be idle or during shutdown", ex);
} finally {
connectionPool.closeAsync();
}
- return null;
- }, cluster.executor()));
+ }));
return closing.get();
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index c1a0b5a..417a170 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -37,7 +37,7 @@ import java.util.concurrent.CompletableFuture;
*/
public interface Connection {
- int DEFAULT_MAX_WAIT_FOR_CONNECTION = 3000;
+ int DEFAULT_MAX_WAIT_FOR_CONNECTION = 30000;
int DEFAULT_MAX_WAIT_FOR_SESSION_CLOSE = 3000;
int DEFAULT_MAX_CONTENT_LENGTH = 65536;
int DEFAULT_RECONNECT_INTERVAL = 1000;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 95f6923..e79944e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -29,6 +29,8 @@ import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.ssl.SslCompletionEvent;
+import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
index 068c332..60e0177 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.server.handler;
+import io.netty.handler.ssl.NotSslRecordException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
@@ -82,4 +83,13 @@ public class OpExecutorHandler extends SimpleChannelInboundHandler<Pair<RequestM
ReferenceCountUtil.release(objects);
}
}
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
+ // basic catch all for server exceptions
+ logger.error(cause.getMessage(), cause);
+ if (cause.getCause() instanceof NotSslRecordException) {
+ ctx.close();
+ }
+ }
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
index ba562a1..7dfe566 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
@@ -38,6 +38,10 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+/**
+ * Tests we know will fail have Shorten the wait for a connection settings as there is really no need to wait the full
+ * time just to get a failure.
+ */
public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrationTest {
/**
@@ -122,7 +126,6 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
}
}
-
@Test
public void shouldEnableSsl() {
final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS).sslSkipCertValidation(true).create();
@@ -156,7 +159,7 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
@Test
public void shouldEnableSslButFailIfClientConnectsWithoutIt() {
- final Cluster cluster = TestClientFactory.build().enableSsl(false).create();
+ final Cluster cluster = TestClientFactory.build().enableSsl(false).maxWaitForConnection(3000).create();
final Client client = cluster.connect();
try {
@@ -199,7 +202,7 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
@Test
public void shouldEnableSslAndClientCertificateAuthAndFailWithoutCert() {
final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS)
- .keyStoreType(KEYSTORE_TYPE_JKS).sslSkipCertValidation(true).create();
+ .keyStoreType(KEYSTORE_TYPE_JKS).sslSkipCertValidation(true).maxWaitForConnection(3000).create();
final Client client = cluster.connect();
try {
@@ -216,7 +219,8 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
@Test
public void shouldEnableSslAndClientCertificateAuthAndFailWithoutTrustedClientCert() {
final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_CLIENT_KEY).keyStorePassword(KEY_PASS)
- .keyStoreType(KEYSTORE_TYPE_JKS).trustStore(JKS_CLIENT_TRUST).trustStorePassword(KEY_PASS).create();
+ .keyStoreType(KEYSTORE_TYPE_JKS).trustStore(JKS_CLIENT_TRUST).trustStorePassword(KEY_PASS)
+ .maxWaitForConnection(3000).create();
final Client client = cluster.connect();
try {
@@ -233,7 +237,7 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
@Test
public void shouldEnableSslAndFailIfProtocolsDontMatch() {
final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS)
- .sslSkipCertValidation(true).sslEnabledProtocols(Arrays.asList("TLSv1.2")).create();
+ .sslSkipCertValidation(true).sslEnabledProtocols(Arrays.asList("TLSv1.2")).maxWaitForConnection(3000).create();
final Client client = cluster.connect();
try {
@@ -250,7 +254,8 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat
@Test
public void shouldEnableSslAndFailIfCiphersDontMatch() {
final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS)
- .sslSkipCertValidation(true).sslCipherSuites(Arrays.asList("SSL_RSA_WITH_RC4_128_SHA")).create();
+ .sslSkipCertValidation(true).sslCipherSuites(Arrays.asList("SSL_RSA_WITH_RC4_128_SHA"))
+ .maxWaitForConnection(3000).create();
final Client client = cluster.connect();
try {