You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/09/28 16:57:01 UTC
[03/44] tinkerpop git commit: Add keep-alive functionality to Java
Driver.
Add keep-alive functionality to Java Driver.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d881484a
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d881484a
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d881484a
Branch: refs/heads/TINKERPOP-944
Commit: d881484a40ef7c5924e97a1adce7d0a7bf6654ea
Parents: 01d035e
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Sep 22 07:48:59 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Sep 22 07:49:45 2016 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
.../src/reference/gremlin-applications.asciidoc | 1 +
.../upgrade/release-3.2.x-incubating.asciidoc | 10 +++
.../tinkerpop/gremlin/driver/Channelizer.java | 39 ++++++++---
.../tinkerpop/gremlin/driver/Cluster.java | 21 ++++++
.../tinkerpop/gremlin/driver/Connection.java | 36 +++++++++-
.../gremlin/driver/ConnectionPool.java | 2 +-
.../tinkerpop/gremlin/driver/Settings.java | 12 +++-
.../driver/handler/WebSocketClientHandler.java | 1 +
.../WebSocketGremlinResponseDecoder.java | 5 +-
.../server/GremlinDriverIntegrateTest.java | 69 ++++++++++++++++++++
11 files changed, 182 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 394cc33..9b323af 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -33,6 +33,7 @@ TinkerPop 3.2.3 (Release Date: NOT OFFICIALLY RELEASED YET)
* `TraversalRing` returns a `null` if it does not contain traversals (previously `IdentityTraversal`).
* Fixed a `JavaTranslator` bug where `Bytecode` instructions were being mutated during translation.
* Added `Path` to Gremlin-Python with respective GraphSON 2.0 deserializer.
+* Added "keep-alive" functionality to the Java driver, which will send a heartbeat to the server when normal request activity on a connection stops for a period of time.
* Renamed the `empty.result.indicator` preference to `result.indicator.null` in Gremlin Console
* If `result.indicator.null` is set to an empty string, then no "result line" is printed in Gremlin Console.
* VertexPrograms can now declare traverser requirements, e.g. to have access to the path when used with `.program()`.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/docs/src/reference/gremlin-applications.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/reference/gremlin-applications.asciidoc b/docs/src/reference/gremlin-applications.asciidoc
index bff9f0f..ce15d66 100644
--- a/docs/src/reference/gremlin-applications.asciidoc
+++ b/docs/src/reference/gremlin-applications.asciidoc
@@ -687,6 +687,7 @@ The following table describes the various configuration options for the Gremlin
|Key |Description |Default
|connectionPool.channelizer |The fully qualified classname of the client `Channelizer` that defines how to connect to the server. |`Channelizer.WebSocketChannelizer`
|connectionPool.enableSsl |Determines if SSL should be enabled or not. If enabled on the server then it must be enabled on the client. |false
+|connectionPool.keepAliveInterval |Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. Set to zero to disable this feature. |1800000
|connectionPool.keyCertChainFile |The X.509 certificate chain file in PEM format. |_none_
|connectionPool.keyFile |The `PKCS#8` private key file in PEM format. |_none_
|connectionPool.keyPassword |The password of the `keyFile` if it's not password-protected |_none_
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/docs/src/upgrade/release-3.2.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade/release-3.2.x-incubating.asciidoc b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
index f9c62e2..e85aeb9 100644
--- a/docs/src/upgrade/release-3.2.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
@@ -59,6 +59,16 @@ gremlin>
See: link:https://issues.apache.org/jira/browse/TINKERPOP-1409[TINKERPOP-1409]
+Java Driver Keep-Alive
+^^^^^^^^^^^^^^^^^^^^^^
+
+The Java Driver now has a `keepAliveInterval` setting, which controls the amount of time in milliseconds it should wait
+on an inactive connection before it sends a message to the server to keep the connection maintained. This should help
+environments that use a load balancer in front of Gremlin Server by ensuring connections are actively maintained even
+during periods of inactivity.
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-1249[TINKERPOP-1249]
+
Where Step Supports By-Modulation
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 40be11c..6ed8e0f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -20,9 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
@@ -39,10 +37,7 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.File;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
@@ -70,6 +65,21 @@ public interface Channelizer extends ChannelHandler {
public void close(final Channel channel);
/**
+ * Create a message for the driver to use as a "keep-alive" for the connection. This method will only be used if
+ * {@link #supportsKeepAlive()} is {@code true}.
+ */
+ public default Object createKeepAliveMessage() {
+ return null;
+ }
+
+ /**
+ * Determines if the channelizer supports a method for keeping the connection to the server alive.
+ */
+ public default boolean supportsKeepAlive() {
+ return false;
+ }
+
+ /**
* Called after the channel connects. The {@code Channelizer} may need to perform some functions, such as a
* handshake.
*/
@@ -80,8 +90,6 @@ public interface Channelizer extends ChannelHandler {
* Base implementation of the client side {@link Channelizer}.
*/
abstract class AbstractChannelizer extends ChannelInitializer<SocketChannel> implements Channelizer {
- private static final Logger logger = LoggerFactory.getLogger(AbstractChannelizer.class);
-
protected Connection connection;
protected Cluster cluster;
private ConcurrentMap<UUID, ResultQueue> pending;
@@ -152,6 +160,21 @@ public interface Channelizer extends ChannelHandler {
}
/**
+ * Keep-alive is supported through the ping/pong websocket protocol.
+ *
+ * @see <a href=https://tools.ietf.org/html/rfc6455#section-5.5.2>IETF RFC 6455</a>
+ */
+ @Override
+ public boolean supportsKeepAlive() {
+ return true;
+ }
+
+ @Override
+ public Object createKeepAliveMessage() {
+ return new PingWebSocketFrame();
+ }
+
+ /**
* Sends a {@code CloseWebSocketFrame} to the server for the specified channel.
*/
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 9c08c3c..f79e719 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -163,6 +163,7 @@ public final class Cluster {
.port(settings.port)
.enableSsl(settings.connectionPool.enableSsl)
.trustCertificateChainFile(settings.connectionPool.trustCertChainFile)
+ .keepAliveInterval(settings.connectionPool.keepAliveInterval)
.keyCertChainFile(settings.connectionPool.keyCertChainFile)
.keyFile(settings.connectionPool.keyFile)
.keyPassword(settings.connectionPool.keyPassword)
@@ -388,6 +389,14 @@ public final class Cluster {
}
/**
+ * Gets time in milliseconds to wait after the last message is sent over a connection before sending a keep-alive
+ * message to the server.
+ */
+ public long getKeepAliveInterval() {
+ return manager.connectionPoolSettings.keepAliveInterval;
+ }
+
+ /**
* Specifies the load balancing strategy to use on the client side.
*/
public Class<? extends LoadBalancingStrategy> getLoadBalancingStrategy() {
@@ -478,6 +487,7 @@ public final class Cluster {
private int reconnectInitialDelay = Connection.RECONNECT_INITIAL_DELAY;
private int reconnectInterval = Connection.RECONNECT_INTERVAL;
private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
+ private long keepAliveInterval = Connection.KEEP_ALIVE_INTERVAL;
private String channelizer = Channelizer.WebSocketChannelizer.class.getName();
private boolean enableSsl = false;
private String trustCertChainFile = null;
@@ -573,6 +583,16 @@ public final class Cluster {
}
/**
+ * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. This
+ * setting is only relevant to {@link Channelizer} implementations that return {@code true} for
+ * {@link Channelizer#supportsKeepAlive()}. Set to zero to disable this feature.
+ */
+ public Builder keepAliveInterval(final long keepAliveInterval) {
+ this.keepAliveInterval = keepAliveInterval;
+ return this;
+ }
+
+ /**
* The X.509 certificate chain file in PEM format.
*/
public Builder keyCertChainFile(final String keyCertChainFile) {
@@ -871,6 +891,7 @@ public final class Cluster {
connectionPoolSettings.keyCertChainFile = builder.keyCertChainFile;
connectionPoolSettings.keyFile = builder.keyFile;
connectionPoolSettings.keyPassword = builder.keyPassword;
+ connectionPoolSettings.keepAliveInterval = builder.keepAliveInterval;
connectionPoolSettings.channelizer = builder.channelizer;
sslContextOptional = Optional.ofNullable(builder.sslContext);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 9dc93a7..1ef9b98 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -55,6 +56,7 @@ final class Connection {
private final Cluster cluster;
private final Client client;
private final ConnectionPool pool;
+ private final long keepAliveInterval;
public static final int MAX_IN_PROCESS = 4;
public static final int MIN_IN_PROCESS = 1;
@@ -64,6 +66,7 @@ final class Connection {
public static final int RECONNECT_INITIAL_DELAY = 1000;
public static final int RECONNECT_INTERVAL = 1000;
public static final int RESULT_ITERATION_BATCH_SIZE = 64;
+ public static final long KEEP_ALIVE_INTERVAL = 1800000;
/**
* When a {@code Connection} is borrowed from the pool, this number is incremented to indicate the number of
@@ -82,6 +85,7 @@ final class Connection {
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
private final AtomicBoolean shutdownInitiated = new AtomicBoolean(false);
+ private final AtomicReference<ScheduledFuture> keepAliveFuture = new AtomicReference<>();
public Connection(final URI uri, final ConnectionPool pool, final int maxInProcess) throws ConnectionException {
this.uri = uri;
@@ -89,6 +93,7 @@ final class Connection {
this.client = pool.getClient();
this.pool = pool;
this.maxInProcess = maxInProcess;
+ this.keepAliveInterval = pool.settings().keepAliveInterval;
connectionLabel = String.format("Connection{host=%s}", pool.host);
@@ -153,6 +158,10 @@ final class Connection {
if (!closeFuture.compareAndSet(null, future))
return closeFuture.get();
+ // stop any pings being sent at the server for keep-alive
+ final ScheduledFuture keepAlive = keepAliveFuture.get();
+ if (keepAlive != null) keepAlive.cancel(true);
+
// make sure all requests in the queue are fully processed before killing. if they are then shutdown
// can be immediate. if not this method will signal the readCompleted future defined in the write()
// operation to check if it can close. in this way the connection no longer receives writes, but
@@ -181,7 +190,8 @@ final class Connection {
// once there is a completed write, then create a traverser for the result set and complete
// the promise so that the client knows that that it can start checking for results.
final Connection thisConnection = this;
- final ChannelPromise promise = channel.newPromise()
+
+ final ChannelPromise requestPromise = channel.newPromise()
.addListener(f -> {
if (!f.isSuccess()) {
if (logger.isDebugEnabled())
@@ -234,9 +244,29 @@ final class Connection {
requestMessage, pool.host));
}
});
- channel.writeAndFlush(requestMessage, promise);
+ channel.writeAndFlush(requestMessage, requestPromise);
+
+ // try to keep the connection alive if the channel allows such things - websockets will
+ if (channelizer.supportsKeepAlive() && keepAliveInterval > 0) {
+
+ final ScheduledFuture oldKeepAliveFuture = keepAliveFuture.getAndSet(cluster.executor().scheduleAtFixedRate(() -> {
+ logger.debug("Request sent to server to keep {} alive", thisConnection);
+ try {
+ channel.writeAndFlush(channelizer.createKeepAliveMessage());
+ } catch (Exception ex) {
+ // will just log this for now - a future real request can be responsible for the failure that
+ // marks the host as dead. this also may not mean the host is actually dead. more robust handling
+ // is in play for real requests, not this simple ping
+ logger.warn(String.format("Keep-alive did not succeed on %s", thisConnection), ex);
+ }
+ }, keepAliveInterval, keepAliveInterval, TimeUnit.MILLISECONDS));
+
+ // try to cancel the old future if it's still un-executed - no need to ping since a new write has come
+ // through on the connection
+ if (oldKeepAliveFuture != null) oldKeepAliveFuture.cancel(true);
+ }
- return promise;
+ return requestPromise;
}
public void returnToPool() {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 9955e82..f0d9044 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -170,7 +170,7 @@ final class ConnectionPool {
logger.debug("Attempting to return {} on {}", connection, host);
if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
- int borrowed = connection.borrowed.decrementAndGet();
+ final int borrowed = connection.borrowed.decrementAndGet();
if (connection.isDead()) {
logger.debug("Marking {} as dead", this.host);
considerUnavailable();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 43014ce..41a697c 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.driver;
import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.yaml.snakeyaml.TypeDescription;
import org.yaml.snakeyaml.Yaml;
@@ -33,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.Properties;
import java.util.stream.Collectors;
/**
@@ -207,6 +205,9 @@ final class Settings {
if (connectionPoolConf.containsKey("resultIterationBatchSize"))
cpSettings.resultIterationBatchSize = connectionPoolConf.getInt("resultIterationBatchSize");
+ if (connectionPoolConf.containsKey("keepAliveInterval"))
+ cpSettings.keepAliveInterval = connectionPoolConf.getLong("keepAliveInterval");
+
settings.connectionPool = cpSettings;
}
@@ -251,6 +252,13 @@ final class Settings {
public int maxSize = ConnectionPool.MAX_POOL_SIZE;
/**
+ * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. This
+ * setting is only relevant to {@link Channelizer} implementations that return {@code true} for
+ * {@link Channelizer#supportsKeepAlive()}. Set to zero to disable this feature.
+ */
+ public long keepAliveInterval = Connection.KEEP_ALIVE_INTERVAL;
+
+ /**
* A connection under low use can be destroyed. This setting determines the threshold for determining when
* that connection can be released and is defaulted to 8.
*/
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 922775e..5ba0f1b 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -86,6 +86,7 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
if (frame instanceof TextWebSocketFrame) {
ctx.fireChannelRead(frame.retain(2));
} else if (frame instanceof PongWebSocketFrame) {
+ logger.debug("Received response from keep-alive request");
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.fireChannelRead(frame.retain(2));
} else if (frame instanceof CloseWebSocketFrame)
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
index 0f24a9a..383e5a5 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver.handler;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
import io.netty.channel.ChannelHandler;
@@ -47,10 +48,12 @@ public final class WebSocketGremlinResponseDecoder extends MessageToMessageDecod
if (webSocketFrame instanceof BinaryWebSocketFrame) {
final BinaryWebSocketFrame tf = (BinaryWebSocketFrame) webSocketFrame;
objects.add(serializer.deserializeResponse(tf.content()));
- } else {
+ } else if (webSocketFrame instanceof TextWebSocketFrame){
final TextWebSocketFrame tf = (TextWebSocketFrame) webSocketFrame;
final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
objects.add(textSerializer.deserializeResponse(tf.text()));
+ } else {
+ throw new RuntimeException(String.format("WebSocket channel does not handle this type of message: %s", webSocketFrame.getClass().getName()));
}
} finally {
ReferenceCountUtil.release(webSocketFrame);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 04faa29..96cde54 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.server;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Level;
import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.driver.Channelizer;
import org.apache.tinkerpop.gremlin.driver.Client;
@@ -27,6 +28,7 @@ import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer;
import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
@@ -35,11 +37,14 @@ import org.apache.tinkerpop.gremlin.server.channel.NioChannelizer;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
+import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
import groovy.json.JsonBuilder;
import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.hamcrest.core.IsInstanceOf;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +73,9 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.AllOf.allOf;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -83,6 +91,37 @@ import static org.hamcrest.core.StringStartsWith.startsWith;
*/
public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegrationTest {
private static final Logger logger = LoggerFactory.getLogger(GremlinDriverIntegrateTest.class);
+
+ private Log4jRecordingAppender recordingAppender = null;
+ private Level previousLogLevel;
+
+ @Before
+ public void setupForEachTest() {
+ recordingAppender = new Log4jRecordingAppender();
+ final org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
+
+ if (name.getMethodName().equals("shouldKeepAliveForWebSockets")) {
+ final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(WebSocketClientHandler.class);
+ previousLogLevel = webSocketClientHandlerLogger.getLevel();
+ webSocketClientHandlerLogger.setLevel(Level.DEBUG);
+ }
+
+ rootLogger.addAppender(recordingAppender);
+ }
+
+ @After
+ public void teardownForEachTest() {
+ final org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
+
+ if (name.getMethodName().equals("shouldKeepAliveForWebSockets")) {
+ final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(WebSocketClientHandler.class);
+ previousLogLevel = webSocketClientHandlerLogger.getLevel();
+ webSocketClientHandlerLogger.setLevel(previousLogLevel);
+ }
+
+ rootLogger.removeAppender(recordingAppender);
+ }
+
/**
* Configure specific Gremlin Server settings for specific tests.
*/
@@ -137,6 +176,36 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
}
@Test
+ public void shouldKeepAliveForWebSockets() throws Exception {
+ // keep the connection pool size at 1 to remove the possibility of lots of connections trying to ping which will
+ // complicate the assertion logic
+ final Cluster cluster = Cluster.build().
+ minConnectionPoolSize(1).
+ maxConnectionPoolSize(1).
+ keepAliveInterval(1000).create();
+ final Client client = cluster.connect();
+
+ // fire up lots of requests so as to schedule/deschedule lots of ping jobs
+ for (int ix = 0; ix < 500; ix++) {
+ assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+ }
+
+ // don't send any messages for a bit so that the driver pings in the background
+ Thread.sleep(3000);
+
+ // make sure no bonus messages sorta fire off once we get back to sending requests
+ for (int ix = 0; ix < 500; ix++) {
+ assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+ }
+
+ // there really shouldn't be more than 3 of these sent. should definitely be at least one though
+ final long messages = recordingAppender.getMessages().stream().filter(m -> m.contains("Received response from keep-alive request")).count();
+ assertThat(messages, allOf(greaterThan(0L), lessThanOrEqualTo(3L)));
+
+ cluster.close();
+ }
+
+ @Test
public void shouldEventuallySucceedAfterChannelLevelError() throws Exception {
final Cluster cluster = Cluster.build().addContactPoint("localhost")
.reconnectIntialDelay(500)