You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2021/04/05 16:58:43 UTC
[tinkerpop] 01/07: TINKERPOP-2245 Added UnifiedChannelizer
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch TINKERPOP-2245
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 0fa8f84d31b9b47698f6a27db48dbf9ac6a0cd89
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Thu Mar 25 15:30:25 2021 -0400
TINKERPOP-2245 Added UnifiedChannelizer
The UnifiedChannelizer consolidates gremlin execution threadpools and unifies/streamlines server request processing between sessionless and sessionful requests.
---
.travis.yml | 4 +
CHANGELOG.asciidoc | 1 +
docs/src/upgrade/release-3.5.x.asciidoc | 11 +
.../apache/tinkerpop/gremlin/driver/Handler.java | 1 -
.../gremlin/server/AbstractChannelizer.java | 29 +-
.../apache/tinkerpop/gremlin/server/Context.java | 51 ++
.../tinkerpop/gremlin/server/GremlinServer.java | 13 +-
.../apache/tinkerpop/gremlin/server/Settings.java | 32 +
.../gremlin/server/channel/HttpChannelizer.java | 4 +-
.../gremlin/server/channel/UnifiedChannelizer.java | 77 +++
.../server/channel/WebSocketChannelizer.java | 12 +-
.../handler/AbstractAuthenticationHandler.java | 12 +-
.../gremlin/server/handler/AbstractRexster.java | 719 +++++++++++++++++++++
.../handler/HttpBasicAuthenticationHandler.java | 11 +-
.../gremlin/server/handler/MultiRexster.java | 207 ++++++
.../tinkerpop/gremlin/server/handler/Rexster.java | 84 +++
.../SaslAndHttpBasicAuthenticationHandler.java | 25 +-
.../server/handler/SaslAuthenticationHandler.java | 11 +-
.../gremlin/server/handler/SingleRexster.java | 73 +++
.../gremlin/server/handler/UnifiedHandler.java | 283 ++++++++
.../handler/WsAndHttpChannelizerHandler.java | 6 +-
.../AbstractGremlinServerIntegrationTest.java | 31 +-
.../tinkerpop/gremlin/server/ContextTest.java | 3 +-
.../gremlin/server/GremlinDriverIntegrateTest.java | 68 +-
...emlinServerAuditLogDeprecatedIntegrateTest.java | 5 +-
.../server/GremlinServerAuditLogIntegrateTest.java | 4 +-
.../server/GremlinServerAuthKrb5IntegrateTest.java | 2 +-
.../server/GremlinServerAuthzIntegrateTest.java | 3 +
.../server/GremlinServerHttpIntegrateTest.java | 4 +-
.../gremlin/server/GremlinServerIntegrateTest.java | 30 +-
.../server/GremlinServerSessionIntegrateTest.java | 73 ++-
.../channel/HttpChannelizerIntegrateTest.java | 5 +
...t.java => UnifiedChannelizerIntegrateTest.java} | 42 +-
gremlin-tools/gremlin-benchmark/pom.xml | 5 -
hadoop-gremlin/pom.xml | 5 -
35 files changed, 1827 insertions(+), 119 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index a1456e7..ff3a058 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -60,6 +60,10 @@ jobs:
name: "gremlin server"
- script:
- "mvn clean install -q -DskipTests -Dci"
+ - "travis_wait 60 mvn verify -pl :gremlin-server -DskipTests -DskipIntegrationTests=false -DincludeNeo4j -DtestUnified=true"
+ name: "gremlin server - unified"
+ - script:
+ - "mvn clean install -q -DskipTests -Dci"
- "mvn verify -pl :gremlin-console -DskipTests -DskipIntegrationTests=false"
name: "gremlin console"
- script:
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index b15b0ab..041c278 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -30,6 +30,7 @@ This release also includes changes from <<release-3-4-3, 3.4.3>>.
* Added a fully shaded version of `gremlin-driver`.
* Exposed websocket connection status in JavaScript driver.
* Fixed a bug where spark-gremlin was not re-attaching properties when using `dedup()`.
+* Fixed a bug in `WsAndHttpChannelizer` pipeline configuration where failed object aggregation could not write back HTTP responses.
* Ensured better consistency of the use of `null` as arguments to mutation steps.
* Added a `ResponseStatusCode` to indicate that a client should retry its request.
* Added `TemporaryException` interface to indicate that a transaction can be retried.
diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc
index 77a7ce0..3b99d58 100644
--- a/docs/src/upgrade/release-3.5.x.asciidoc
+++ b/docs/src/upgrade/release-3.5.x.asciidoc
@@ -410,6 +410,17 @@ these values are not hashable and will result in an error. By introducing a `Has
See: link:https://issues.apache.org/jira/browse/TINKERPOP-2395[TINKERPOP-2395],
link:https://issues.apache.org/jira/browse/TINKERPOP-2407[TINKERPOP-2407]
+==== Gremlin Server UnifiedChannelizer
+
+Just some notes for later:
+
+* UnifiedChannelizer technically replaces all existing implementations but is not yet the default
+* Some new settings related to it: maxParameters, sessionLifeTimeout, useGlobalFunctionCacheForSessions, useCommonEngineForSessions
+* Session behavior shifts slightly under this channelizer for async calls, where a failure will mean that the session
+will close, remaining requests in the queue will be ignored and rollback will occur.
+* care should be take with strict transaction management and multi-graph transactions (which aren't real - not a new thing)
+* absolute max lifetime of a session is a new thing
+
==== Gremlin Server Audit Logging
The `authentication.enableAuditlog` configuration property is deprecated, but replaced by the `enableAuditLog` property
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index aa15597..11bf8b4 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -222,7 +222,6 @@ final class Handler {
final ResultQueue queue = pending.get(response.getRequestId());
if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
final Object data = response.getResult().getData();
- final Map<String,Object> meta = response.getResult().getMeta();
// this is a "result" from the server which is either the result of a script or a
// serialized traversal
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
index 1e2287ea..ee02bd8 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
@@ -50,6 +50,7 @@ import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Constructor;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
@@ -98,6 +99,7 @@ public abstract class AbstractChannelizer extends ChannelInitializer<SocketChann
public static final String PIPELINE_AUTHORIZER = "authorizer";
public static final String PIPELINE_REQUEST_HANDLER = "request-handler";
public static final String PIPELINE_HTTP_RESPONSE_ENCODER = "http-response-encoder";
+ public static final String PIPELINE_HTTP_AGGREGATOR = "http-aggregator";
public static final String PIPELINE_WEBSOCKET_SERVER_COMPRESSION = "web-socket-server-compression-handler";
protected static final String PIPELINE_SSL = "ssl";
@@ -182,10 +184,29 @@ public abstract class AbstractChannelizer extends ChannelInitializer<SocketChann
protected AbstractAuthenticationHandler createAuthenticationHandler(final Settings settings) {
try {
final Class<?> clazz = Class.forName(settings.authentication.authenticationHandler);
- final Class[] constructorArgs = new Class[2];
- constructorArgs[0] = Authenticator.class;
- constructorArgs[1] = Settings.class;
- return (AbstractAuthenticationHandler) clazz.getDeclaredConstructor(constructorArgs).newInstance(authenticator, settings);
+ AbstractAuthenticationHandler aah;
+ try {
+ // the three arg constructor is the new form as a handler may need the authorizer in some cases
+ final Class<?>[] threeArgForm = new Class[]{Authenticator.class, Authorizer.class, Settings.class};
+ final Constructor<?> twoArgConstructor = clazz.getDeclaredConstructor(threeArgForm);
+ return (AbstractAuthenticationHandler) twoArgConstructor.newInstance(authenticator, authorizer, settings);
+ } catch (Exception threeArgEx) {
+ try {
+ // the two arg constructor is the "old form" that existed prior to Authorizers. should probably
+ // deprecate this form
+ final Class<?>[] twoArgForm = new Class[]{Authenticator.class, Settings.class};
+ final Constructor<?> twoArgConstructor = clazz.getDeclaredConstructor(twoArgForm);
+
+ if (authorizer != null) {
+ logger.warn("There is an authorizer configured but the {} does not have a constructor of ({}, {}, {}) so it cannot be added",
+ clazz.getName(), Authenticator.class.getSimpleName(), Authorizer.class.getSimpleName(), Settings.class.getSimpleName());
+ }
+
+ return (AbstractAuthenticationHandler) twoArgConstructor.newInstance(authenticator, settings);
+ } catch (Exception twoArgEx) {
+ throw twoArgEx;
+ }
+ }
} catch (Exception ex) {
logger.warn(ex.getMessage());
throw new IllegalStateException(String.format("Could not create/configure AuthenticationHandler %s", settings.authentication.authenticationHandler), ex);
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index 902a788..fcd2072 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -18,16 +18,21 @@
*/
package org.apache.tinkerpop.gremlin.server;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import io.netty.channel.ChannelHandlerContext;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinScriptChecker;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.server.handler.Frame;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +50,11 @@ public class Context {
private final GremlinExecutor gremlinExecutor;
private final ScheduledExecutorService scheduledExecutorService;
private final AtomicBoolean finalResponseWritten = new AtomicBoolean();
+ private final long requestTimeout;
+ private final RequestContentType requestContentType;
+ private final Object gremlinArgument;
+
+ public enum RequestContentType { BYTECODE, SCRIPT, UNKNOWN }
public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx,
final Settings settings, final GraphManager graphManager,
@@ -55,6 +65,23 @@ public class Context {
this.graphManager = graphManager;
this.gremlinExecutor = gremlinExecutor;
this.scheduledExecutorService = scheduledExecutorService;
+
+ // order of calls matter as one depends on the next
+ this.gremlinArgument = requestMessage.getArgs().get(Tokens.ARGS_GREMLIN);
+ this.requestContentType = determineRequestContents();
+ this.requestTimeout = determineTimeout();
+ }
+
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public RequestContentType getRequestContentType() {
+ return requestContentType;
+ }
+
+ public Object getGremlinArgument() {
+ return gremlinArgument;
}
public ScheduledExecutorService getScheduledExecutorService() {
@@ -133,4 +160,28 @@ public class Context {
}
}
+
+ private RequestContentType determineRequestContents() {
+ if (gremlinArgument instanceof Bytecode)
+ return RequestContentType.BYTECODE;
+ else if (gremlinArgument instanceof String)
+ return RequestContentType.SCRIPT;
+ else
+ return RequestContentType.UNKNOWN;
+ }
+
+ private long determineTimeout() {
+ // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent
+ // both configurations from being submitted at the same time
+ final Map<String, Object> args = requestMessage.getArgs();
+ final long seto = args.containsKey(Tokens.ARGS_EVAL_TIMEOUT) ?
+ ((Number) args.get(Tokens.ARGS_EVAL_TIMEOUT)).longValue() : settings.getEvaluationTimeout();
+
+ // override the timeout if the lifecycle has a value assigned. if the script contains with(timeout)
+ // options then allow that value to override what's provided on the lifecycle
+ final Optional<Long> timeoutDefinedInScript = requestContentType == RequestContentType.SCRIPT ?
+ GremlinScriptChecker.parse(gremlinArgument.toString()).getTimeout() : Optional.empty();
+
+ return timeoutDefinedInScript.orElse(seto);
+ }
}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
index 2a1adf7..b322aa7 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
@@ -78,6 +78,7 @@ public class GremlinServer {
private final ExecutorService gremlinExecutorService;
private final ServerGremlinExecutor serverGremlinExecutor;
private final boolean isEpollEnabled;
+ private Channelizer channelizer;
/**
* Construct a Gremlin Server instance from {@link Settings}.
@@ -159,7 +160,7 @@ public class GremlinServer {
}
});
- final Channelizer channelizer = createChannelizer(settings);
+ channelizer = createChannelizer(settings);
channelizer.init(serverGremlinExecutor);
b.group(bossGroup, workerGroup)
.childHandler(channelizer);
@@ -284,8 +285,10 @@ public class GremlinServer {
}
try {
- if (gremlinExecutorService != null)
- gremlinExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS);
+ if (gremlinExecutorService != null) {
+ if (!gremlinExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS))
+ logger.warn("Gremlin thread pool did not fully terminate - continuing with shutdown process");
+ }
} catch (InterruptedException ie) {
logger.warn("Timeout waiting for Gremlin thread pool to shutdown - continuing with shutdown process.");
}
@@ -330,6 +333,10 @@ public class GremlinServer {
return serverGremlinExecutor;
}
+ public Channelizer getChannelizer() {
+ return channelizer;
+ }
+
public static void main(final String[] args) throws Exception {
// add to vm options: -Dlog4j.configuration=file:conf/log4j.properties
printHeader();
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
index 3a535ab..3f84c19 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
import org.apache.tinkerpop.gremlin.server.handler.AbstractAuthenticationHandler;
import org.apache.tinkerpop.gremlin.server.util.DefaultGraphManager;
@@ -197,6 +198,37 @@ public class Settings {
public String graphManager = DefaultGraphManager.class.getName();
/**
+ * Maximum number of parameters that can be passed on a request. Larger numbers may impact performance for scripts.
+ * The default is 16 and this setting only applies to the {@link UnifiedChannelizer}.
+ */
+ public int maxParameters = 16;
+
+ /**
+ * The time in milliseconds that a {@link UnifiedChannelizer} session can exist. This value cannot be extended
+ * beyond this value irrespective of the number of requests and their individual timeouts. Requests must complete
+ * within this time frame. The default is 10 minutes.
+ */
+ public long sessionLifetimeTimeout = 600000;
+
+ /**
+ * Enable the global function cache for sessions when using the {@link UnifiedChannelizer}. This setting is only
+ * relevant when {@link #useGlobalFunctionCacheForSessions} is {@code false}. When {@link true} it means that
+ * functions created in one request to a session remain available on the next request to that session.
+ */
+ public boolean useGlobalFunctionCacheForSessions = true;
+
+ /**
+ * When {@code true} and using the {@link UnifiedChannelizer} the same engine that will be used to server
+ * sessionless requests will also be use to serve sessions. The default value of {@code true} is recommended as
+ * it reduces the amount of object creation required for each session and should generally lead to better
+ * performance especially if the expectation is that there will be many sessions being created and destroyed
+ * rapidly. Setting this value to {@code false} is mostly present to support specific use cases that may require
+ * each session having its own engine or to match previous functionality provided by the older channelizers
+ * produced prior to 3.5.0.
+ */
+ public boolean useCommonEngineForSessions = true;
+
+ /**
* Configured metrics for Gremlin Server.
*/
public ServerMetrics metrics = null;
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
index 7b3ad99..be2a7af 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
@@ -62,7 +62,9 @@ public class HttpChannelizer extends AbstractChannelizer {
if (logger.isDebugEnabled())
pipeline.addLast(new LoggingHandler("http-io", LogLevel.DEBUG));
- pipeline.addLast(new HttpObjectAggregator(settings.maxContentLength));
+ final HttpObjectAggregator aggregator = new HttpObjectAggregator(settings.maxContentLength);
+ aggregator.setMaxCumulationBufferComponents(settings.maxAccumulationBufferComponents);
+ pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator);
if (authenticator != null) {
// Cannot add the same handler instance multiple times unless
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java
new file mode 100644
index 0000000..522baa2
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.channel;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.tinkerpop.gremlin.server.AbstractChannelizer;
+import org.apache.tinkerpop.gremlin.server.Channelizer;
+import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
+import org.apache.tinkerpop.gremlin.server.handler.UnifiedHandler;
+import org.apache.tinkerpop.gremlin.server.handler.WsAndHttpChannelizerHandler;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
+
+/**
+ * A {@link Channelizer} that supports websocket and HTTP requests and does so with the most streamlined processing
+ * model for Gremlin Server introduced with 3.5.0.
+ */
+public class UnifiedChannelizer extends AbstractChannelizer {
+
+ private WsAndHttpChannelizerHandler handler;
+ private UnifiedHandler unifiedHandler;
+ protected static final String PIPELINE_UNIFIED = "unified";
+
+ @Override
+ public void init(final ServerGremlinExecutor serverGremlinExecutor) {
+ super.init(serverGremlinExecutor);
+ handler = new WsAndHttpChannelizerHandler();
+ handler.init(serverGremlinExecutor, new HttpGremlinEndpointHandler(serializers, gremlinExecutor, graphManager, settings));
+
+ // these handlers don't share any state and can thus be initialized once per pipeline
+ unifiedHandler = new UnifiedHandler(settings, graphManager, gremlinExecutor, scheduledExecutorService, this);
+ }
+
+ @Override
+ public void configure(final ChannelPipeline pipeline) {
+ handler.configure(pipeline);
+ pipeline.addAfter(PIPELINE_HTTP_REQUEST_DECODER, "WsAndHttpChannelizerHandler", handler);
+ }
+
+ @Override
+ public void finalize(final ChannelPipeline pipeline) {
+ super.finalize(pipeline);
+ pipeline.remove(PIPELINE_OP_SELECTOR);
+ pipeline.remove(PIPELINE_OP_EXECUTOR);
+
+ pipeline.addLast(PIPELINE_UNIFIED, unifiedHandler);
+ }
+
+ public UnifiedHandler getUnifiedHandler() {
+ return unifiedHandler;
+ }
+
+ @Override
+ public boolean supportsIdleMonitor() {
+ return true;
+ }
+
+ @Override
+ public Object createIdleDetectionMessage() {
+ return handler.getWsChannelizer().createIdleDetectionMessage();
+ }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
index c4ff402..a1864fd 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
@@ -81,8 +81,11 @@ public class WebSocketChannelizer extends AbstractChannelizer {
@Override
public void configure(final ChannelPipeline pipeline) {
+
if (logger.isDebugEnabled())
- pipeline.addLast(new LoggingHandler("log-io", LogLevel.DEBUG));
+ pipeline.addLast(new LoggingHandler("log-encoder-aggregator", LogLevel.DEBUG));
+
+ pipeline.addLast(PIPELINE_HTTP_RESPONSE_ENCODER, new HttpResponseEncoder());
logger.debug("HttpRequestDecoder settings - maxInitialLineLength={}, maxHeaderSize={}, maxChunkSize={}",
settings.maxInitialLineLength, settings.maxHeaderSize, settings.maxChunkSize);
@@ -95,12 +98,7 @@ public class WebSocketChannelizer extends AbstractChannelizer {
settings.maxContentLength, settings.maxAccumulationBufferComponents);
final HttpObjectAggregator aggregator = new HttpObjectAggregator(settings.maxContentLength);
aggregator.setMaxCumulationBufferComponents(settings.maxAccumulationBufferComponents);
- pipeline.addLast("aggregator", aggregator);
-
- if (logger.isDebugEnabled())
- pipeline.addLast(new LoggingHandler("log-aggregator-encoder", LogLevel.DEBUG));
-
- pipeline.addLast(PIPELINE_HTTP_RESPONSE_ENCODER, new HttpResponseEncoder());
+ pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator);
// Add compression extension for WebSocket defined in https://tools.ietf.org/html/rfc7692
pipeline.addLast(PIPELINE_WEBSOCKET_SERVER_COMPRESSION, new WebSocketServerCompressionHandler());
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
index 026ad59..074e4ab 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractAuthenticationHandler.java
@@ -21,15 +21,25 @@ package org.apache.tinkerpop.gremlin.server.handler;
import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
/**
* Provides an abstraction point to allow for http auth schemes beyond basic auth.
*/
public abstract class AbstractAuthenticationHandler extends ChannelInboundHandlerAdapter {
protected final Authenticator authenticator;
+ protected final Authorizer authorizer;
+ /**
+ * @deprecated As of release 3.5.0, replaced by {@link #AbstractAuthenticationHandler(Authenticator, Authorizer)}.
+ */
+ @Deprecated
public AbstractAuthenticationHandler(final Authenticator authenticator) {
- this.authenticator = authenticator;
+ this(authenticator, null);
}
+ public AbstractAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer) {
+ this.authenticator = authenticator;
+ this.authorizer = authorizer;
+ }
}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
new file mode 100644
index 0000000..7f7c51c
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractRexster.java
@@ -0,0 +1,719 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import groovy.lang.GroovyRuntimeException;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.TimedInterruptTimeoutException;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
+import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.server.GremlinServer;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
+import org.apache.tinkerpop.gremlin.server.util.ExceptionHelper;
+import org.apache.tinkerpop.gremlin.server.util.TraverserIterator;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.util.TemporaryException;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.codehaus.groovy.control.MultipleCompilationErrorsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.Bindings;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+/**
+ * A base implementation of {@link Rexster} which offers some common functionality that matches typical Gremlin Server
+ * request response expectations for script, bytecode and graph operations. The class is designed to be extended but
+ * take care in understanding the way that different methods are called as they do depend on one another a bit. It
+ * maybe best to examine the source code to determine how best to use this class or to extend from the higher order
+ * classes of {@link SingleRexster} or {@link MultiRexster}.
+ */
+public abstract class AbstractRexster implements Rexster, AutoCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractRexster.class);
+ private static final Logger auditLogger = LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
+
+ private final Channel initialChannel;
+ private final boolean transactionManaged;
+ private final String sessionId;
+ private final AtomicReference<ScheduledFuture<?>> sessionCancelFuture = new AtomicReference<>();
+ private final AtomicReference<Future<?>> sessionFuture = new AtomicReference<>();
+ private long actualTimeoutLength = 0;
+ private boolean actualTimeoutCausedBySession = false;
+ protected final GraphManager graphManager;
+ protected final ConcurrentMap<String, Rexster> sessions;
+ protected final Set<String> aliasesUsedByRexster = new HashSet<>();
+
+ AbstractRexster(final Context gremlinContext, final String sessionId,
+ final boolean transactionManaged, final ConcurrentMap<String, Rexster> sessions) {
+ this.transactionManaged = transactionManaged;
+ this.sessionId = sessionId;
+ this.initialChannel = gremlinContext.getChannelHandlerContext().channel();
+
+ // close Rexster if the channel closes to cleanup and close transactions
+ this.initialChannel.closeFuture().addListener(f -> {
+ // cancel session worker or it will keep waiting for items to appear in the session queue
+ final Future<?> sf = sessionFuture.get();
+ if (sf != null && !sf.isDone()) {
+ sf.cancel(true);
+ }
+ close();
+ });
+ this.sessions = sessions;
+ this.graphManager = gremlinContext.getGraphManager();
+ }
+
+ public boolean isTransactionManaged() {
+ return transactionManaged;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public boolean isBoundTo(final Channel channel) {
+ return channel == initialChannel;
+ }
+
+ public long getActualTimeoutLength() {
+ return actualTimeoutLength;
+ }
+
+ public boolean isActualTimeoutCausedBySession() {
+ return actualTimeoutCausedBySession;
+ }
+
+ public GremlinScriptEngine getScriptEngine(final Context context, final String language) {
+ return context.getGremlinExecutor().getScriptEngineManager().getEngineByName(language);
+ }
+
+ @Override
+ public void setSessionCancelFuture(final ScheduledFuture<?> f) {
+ if (!sessionCancelFuture.compareAndSet(null, f))
+ throw new IllegalStateException("Session cancellation future is already set");
+ }
+
+ @Override
+ public void setSessionFuture(final Future<?> f) {
+ if (!sessionFuture.compareAndSet(null, f))
+ throw new IllegalStateException("Session future is already set");
+ }
+
+ @Override
+ public synchronized void triggerTimeout(final long timeout, final boolean causedBySession) {
+ // triggering timeout triggers the stop of the Rexster Runnable which will end in close()
+ // for final cleanup
+ final Future<?> f = sessionFuture.get();
+ if (f != null && !f.isDone()) {
+ actualTimeoutCausedBySession = causedBySession;
+ actualTimeoutLength = timeout;
+ sessionFuture.get().cancel(true);
+ }
+ }
+
+ protected void process(final Context gremlinContext) throws RexsterException {
+ final RequestMessage msg = gremlinContext.getRequestMessage();
+ final Map<String, Object> args = msg.getArgs();
+ final Object gremlinToExecute = args.get(Tokens.ARGS_GREMLIN);
+
+ // for strict transactions track the aliases used so that we can commit them and only them on close()
+ if (gremlinContext.getSettings().strictTransactionManagement)
+ msg.optionalArgs(Tokens.ARGS_ALIASES).ifPresent(m -> aliasesUsedByRexster.addAll(((Map<String,String>) m).values()));
+
+ try {
+ // itty is optional as Bytecode could be a "graph operation" rather than a Traversal. graph operations
+ // don't need to be iterated and handle their own lifecycle
+ final Optional<Iterator<?>> itty = gremlinToExecute instanceof Bytecode ?
+ fromBytecode(gremlinContext, (Bytecode) gremlinToExecute) :
+ Optional.of(fromScript(gremlinContext, (String) gremlinToExecute));
+
+ processAuditLog(gremlinContext.getSettings(), gremlinContext.getChannelHandlerContext(), gremlinToExecute);
+
+ if (itty.isPresent())
+ handleIterator(gremlinContext, itty.get());
+ } catch (Exception ex) {
+ handleException(gremlinContext, ex);
+ }
+ }
+
+ protected void handleException(final Context gremlinContext, final Throwable t) throws RexsterException {
+ if (t instanceof RexsterException) throw (RexsterException) t;
+
+ final Optional<Throwable> possibleTemporaryException = determineIfTemporaryException(t);
+ if (possibleTemporaryException.isPresent()) {
+ final Throwable temporaryException = possibleTemporaryException.get();
+ throw new RexsterException(temporaryException.getMessage(), t,
+ ResponseMessage.build(gremlinContext.getRequestMessage())
+ .code(ResponseStatusCode.SERVER_ERROR_TEMPORARY)
+ .statusMessage(temporaryException.getMessage())
+ .statusAttributeException(temporaryException).create());
+ }
+
+ final Throwable root = ExceptionUtils.getRootCause(t);
+
+ if (root instanceof TimedInterruptTimeoutException) {
+ // occurs when the TimedInterruptCustomizerProvider is in play
+ final String msg = String.format("A timeout occurred within the script during evaluation of [%s] - consider increasing the limit given to TimedInterruptCustomizerProvider",
+ gremlinContext.getRequestMessage().getRequestId());
+ throw new RexsterException(msg, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+ .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+ .statusMessage("Timeout during script evaluation triggered by TimedInterruptCustomizerProvider")
+ .create());
+ }
+
+ if (root instanceof TimeoutException) {
+ final String errorMessage = String.format("Script evaluation exceeded the configured threshold for request [%s]",
+ gremlinContext.getRequestMessage().getRequestId());
+ throw new RexsterException(errorMessage, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+ .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+ .statusMessage(t.getMessage())
+ .create());
+ }
+
+ if (root instanceof InterruptedException ||
+ root instanceof TraversalInterruptedException ||
+ root instanceof InterruptedIOException) {
+ final String msg = actualTimeoutCausedBySession ?
+ String.format("Session closed - %s - sessionLifetimeTimeout of %s ms exceeded", sessionId, actualTimeoutLength) :
+ String.format("Evaluation exceeded timeout threshold of %s ms", actualTimeoutLength);
+ throw new RexsterException(msg, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+ .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+ .statusMessage(msg).create());
+ }
+
+ if (root instanceof MultipleCompilationErrorsException && root.getMessage().contains("Method too large") &&
+ ((MultipleCompilationErrorsException) root).getErrorCollector().getErrorCount() == 1) {
+ final String errorMessage = String.format("The Gremlin statement that was submitted exceeds the maximum compilation size allowed by the JVM, please split it into multiple smaller statements - %s", trimMessage(gremlinContext.getRequestMessage()));
+ logger.warn(errorMessage);
+ throw new RexsterException(errorMessage, root, ResponseMessage.build(gremlinContext.getRequestMessage())
+ .code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+ .statusMessage(errorMessage)
+ .statusAttributeException(root).create());
+ }
+
+ // GroovyRuntimeException will hit a pretty wide range of eval type errors, like MissingPropertyException,
+ // CompilationFailedException, MissingMethodException, etc. If more specific handling is required then
+ // try to catch it earlier above.
+ if (root instanceof GroovyRuntimeException ||
+ root instanceof VerificationException ||
+ root instanceof ScriptException) {
+ throw new RexsterException(root.getMessage(), root, ResponseMessage.build(gremlinContext.getRequestMessage())
+ .code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+ .statusMessage(root.getMessage())
+ .statusAttributeException(root).create());
+ }
+
+ throw new RexsterException(root.getClass().getSimpleName() + ": " + root.getMessage(), root,
+ ResponseMessage.build(gremlinContext.getRequestMessage())
+ .code(ResponseStatusCode.SERVER_ERROR)
+ .statusAttributeException(root)
+ .statusMessage(root.getMessage()).create());
+ }
+
+ /**
+ * Used to decrease the size of a Gremlin script that triggered a "method too large" exception so that it
+ * doesn't log a massive text string nor return a large error message.
+ */
+ private RequestMessage trimMessage(final RequestMessage msg) {
+ final RequestMessage trimmedMsg = RequestMessage.from(msg).create();
+ if (trimmedMsg.getArgs().containsKey(Tokens.ARGS_GREMLIN))
+ trimmedMsg.getArgs().put(Tokens.ARGS_GREMLIN, trimmedMsg.getArgs().get(Tokens.ARGS_GREMLIN).toString().substring(0, 1021) + "...");
+
+ return trimmedMsg;
+ }
+
+ /**
+ * Check if any exception in the chain is TemporaryException then we should respond with the right error code so
+ * that the client knows to retry.
+ */
+ protected Optional<Throwable> determineIfTemporaryException(final Throwable ex) {
+ return Stream.of(ExceptionUtils.getThrowables(ex)).
+ filter(i -> i instanceof TemporaryException).findFirst();
+ }
+
+ @Override
+ public synchronized void close() {
+ // already closing/closed
+ if (!sessions.containsKey(sessionId)) return;
+
+ sessions.remove(sessionId);
+
+ if (sessionCancelFuture.get() != null) {
+ final ScheduledFuture<?> f = sessionCancelFuture.get();
+ if (!f.isDone()) f.cancel(true);
+ }
+ }
+
+ protected Iterator<?> fromScript(final Context gremlinContext, final String script) throws Exception {
+ final RequestMessage msg = gremlinContext.getRequestMessage();
+ final Map<String, Object> args = msg.getArgs();
+ final String language = args.containsKey(Tokens.ARGS_LANGUAGE) ? (String) args.get(Tokens.ARGS_LANGUAGE) : "gremlin-groovy";
+ return IteratorUtils.asIterator(getScriptEngine(gremlinContext, language).eval(
+ script, mergeBindingsFromRequest(gremlinContext, getWorkerBindings())));
+ }
+
+ protected Optional<Iterator<?>> fromBytecode(final Context gremlinContext, final Bytecode bytecode) throws Exception {
+ final RequestMessage msg = gremlinContext.getRequestMessage();
+
+ final Traversal.Admin<?, ?> traversal;
+ final Map<String, String> aliases = (Map<String, String>) msg.optionalArgs(Tokens.ARGS_ALIASES).get();
+ final GraphManager graphManager = gremlinContext.getGraphManager();
+ final String traversalSourceName = aliases.entrySet().iterator().next().getValue();
+ final TraversalSource g = graphManager.getTraversalSource(traversalSourceName);
+
+ // handle bytecode based graph operations like commit/rollback commands
+ if (BytecodeHelper.isGraphOperation(bytecode)) {
+ handleGraphOperation(gremlinContext, bytecode, g.getGraph());
+ return Optional.empty();
+ } else {
+
+ final Optional<String> lambdaLanguage = BytecodeHelper.getLambdaLanguage(bytecode);
+ if (!lambdaLanguage.isPresent())
+ traversal = JavaTranslator.of(g).translate(bytecode);
+ else {
+ final SimpleBindings bindings = new SimpleBindings();
+ bindings.put(traversalSourceName, g);
+ traversal = gremlinContext.getGremlinExecutor().getScriptEngineManager().getEngineByName(lambdaLanguage.get()).eval(bytecode, bindings, traversalSourceName);
+ }
+
+ // compile the traversal - without it getEndStep() has nothing in it
+ traversal.applyStrategies();
+
+ return Optional.of(new TraverserIterator(traversal));
+ }
+ }
+
+ protected Bindings getWorkerBindings() throws RexsterException {
+ return new SimpleBindings(graphManager.getAsBindings());
+ }
+
+ protected Bindings mergeBindingsFromRequest(final Context gremlinContext, final Bindings bindings) throws RexsterException {
+ // alias any global bindings to a different variable.
+ final RequestMessage msg = gremlinContext.getRequestMessage();
+ if (msg.getArgs().containsKey(Tokens.ARGS_ALIASES)) {
+ final Map<String, String> aliases = (Map<String, String>) msg.getArgs().get(Tokens.ARGS_ALIASES);
+ for (Map.Entry<String,String> aliasKv : aliases.entrySet()) {
+ boolean found = false;
+
+ // first check if the alias refers to a Graph instance
+ final Graph graph = gremlinContext.getGraphManager().getGraph(aliasKv.getValue());
+ if (null != graph) {
+ bindings.put(aliasKv.getKey(), graph);
+ found = true;
+ }
+
+ // if the alias wasn't found as a Graph then perhaps it is a TraversalSource - it needs to be
+ // something
+ if (!found) {
+ final TraversalSource ts = gremlinContext.getGraphManager().getTraversalSource(aliasKv.getValue());
+ if (null != ts) {
+ bindings.put(aliasKv.getKey(), ts);
+ found = true;
+ }
+ }
+
+ // this validation is important to calls to GraphManager.commit() and rollback() as they both
+ // expect that the aliases supplied are valid
+ if (!found) {
+ final String error = String.format("Could not alias [%s] to [%s] as [%s] not in the Graph or TraversalSource global bindings",
+ aliasKv.getKey(), aliasKv.getValue(), aliasKv.getValue());
+ throw new RexsterException(error, ResponseMessage.build(msg)
+ .code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(error).create());
+ }
+ }
+ } else {
+ // there's no bindings so determine if that's ok with Gremlin Server
+ if (gremlinContext.getSettings().strictTransactionManagement) {
+ final String error = "Gremlin Server is configured with strictTransactionManagement as 'true' - the 'aliases' arguments must be provided";
+ throw new RexsterException(error, ResponseMessage.build(msg)
+ .code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(error).create());
+ }
+ }
+
+ // add any bindings to override any other supplied
+ Optional.ofNullable((Map<String, Object>) msg.getArgs().get(Tokens.ARGS_BINDINGS)).ifPresent(bindings::putAll);
+ return bindings;
+ }
+
+ /**
+ * Provides a generic way of iterating a result set back to the client.
+ *
+ * @param gremlinContext The Gremlin Server {@link Context} object containing settings, request message, etc.
+ * @param itty The result to iterator
+ */
+ protected void handleIterator(final Context gremlinContext, final Iterator<?> itty) throws InterruptedException {
+ final ChannelHandlerContext nettyContext = gremlinContext.getChannelHandlerContext();
+ final RequestMessage msg = gremlinContext.getRequestMessage();
+ final Settings settings = gremlinContext.getSettings();
+ boolean warnOnce = false;
+
+ // sessionless requests are always transaction managed, but in-session requests are configurable.
+ final boolean managedTransactionsForRequest = transactionManaged ?
+ true : (Boolean) msg.getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false);
+
+ // we have an empty iterator - happens on stuff like: g.V().iterate()
+ if (!itty.hasNext()) {
+ final Map<String, Object> attributes = generateStatusAttributes(gremlinContext,ResponseStatusCode.NO_CONTENT, itty);
+ // as there is nothing left to iterate if we are transaction managed then we should execute a
+ // commit here before we send back a NO_CONTENT which implies success
+ if (managedTransactionsForRequest)
+ closeTransaction(gremlinContext, Transaction.Status.COMMIT);
+
+ gremlinContext.writeAndFlush(ResponseMessage.build(msg)
+ .code(ResponseStatusCode.NO_CONTENT)
+ .statusAttributes(attributes)
+ .create());
+ return;
+ }
+
+ // the batch size can be overridden by the request
+ final int resultIterationBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
+ .orElse(settings.resultIterationBatchSize);
+ List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
+
+ // use an external control to manage the loop as opposed to just checking hasNext() in the while. this
+ // prevent situations where auto transactions create a new transaction after calls to commit() withing
+ // the loop on calls to hasNext().
+ boolean hasMore = itty.hasNext();
+
+ while (hasMore) {
+ if (Thread.interrupted()) throw new InterruptedException();
+
+ // check if an implementation needs to force flush the aggregated results before the iteration batch
+ // size is reached.
+ // todo: what implementation does this?! can we kill it going forward - seems always false
+ // final boolean forceFlush = isForceFlushed(nettyContext, msg, itty);
+ final boolean forceFlush = false;
+
+ // have to check the aggregate size because it is possible that the channel is not writeable (below)
+ // so iterating next() if the message is not written and flushed would bump the aggregate size beyond
+ // the expected resultIterationBatchSize. Total serialization time for the response remains in
+ // effect so if the client is "slow" it may simply timeout.
+ //
+ // there is a need to check hasNext() on the iterator because if the channel is not writeable the
+ // previous pass through the while loop will have next()'d the iterator and if it is "done" then a
+ // NoSuchElementException will raise its head. also need a check to ensure that this iteration doesn't
+ // require a forced flush which can be forced by sub-classes.
+ //
+ // this could be placed inside the isWriteable() portion of the if-then below but it seems better to
+ // allow iteration to continue into a batch if that is possible rather than just doing nothing at all
+ // while waiting for the client to catch up
+ if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush) aggregate.add(itty.next());
+
+ // Don't keep executor busy if client has already given up; there is no way to catch up if the channel is
+ // not active, and hence we should break the loop.
+ if (!nettyContext.channel().isActive()) {
+ if (managedTransactionsForRequest) {
+ closeTransaction(gremlinContext, Transaction.Status.ROLLBACK);
+ }
+ break;
+ }
+
+ // send back a page of results if batch size is met or if it's the end of the results being iterated.
+ // also check writeability of the channel to prevent OOME for slow clients.
+ //
+ // clients might decide to close the Netty channel to the server with a CloseWebsocketFrame after errors
+ // like CorruptedFrameException. On the server, although the channel gets closed, there might be some
+ // executor threads waiting for watermark to clear which will not clear in these cases since client has
+ // already given up on these requests. This leads to these executors waiting for the client to consume
+ // results till the timeout. checking for isActive() should help prevent that.
+ if (nettyContext.channel().isActive() && nettyContext.channel().isWritable()) {
+ if (forceFlush || aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
+ final ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
+ Frame frame = null;
+ try {
+ frame = makeFrame(gremlinContext, aggregate, code, itty);
+ } catch (Exception ex) {
+ // a frame may use a Bytebuf which is a countable release - if it does not get written
+ // downstream it needs to be released here
+ if (frame != null) frame.tryRelease();
+
+ // exception is handled in makeFrame() - serialization error gets written back to driver
+ // at that point
+ if (managedTransactionsForRequest)
+ closeTransaction(gremlinContext, Transaction.Status.ROLLBACK);
+ break;
+ }
+
+ // track whether there is anything left in the iterator because it needs to be accessed after
+ // the transaction could be closed - in that case a call to hasNext() could open a new transaction
+ // unintentionally
+ final boolean moreInIterator = itty.hasNext();
+
+ try {
+ // only need to reset the aggregation list if there's more stuff to write
+ if (moreInIterator)
+ aggregate = new ArrayList<>(resultIterationBatchSize);
+ else {
+ // iteration and serialization are both complete which means this finished successfully. note that
+ // errors internal to script eval or timeout will rollback given GremlinServer's global configurations.
+ // local errors will get rolledback below because the exceptions aren't thrown in those cases to be
+ // caught by the GremlinExecutor for global rollback logic. this only needs to be committed if
+ // there are no more items to iterate and serialization is complete
+ if (managedTransactionsForRequest)
+ closeTransaction(gremlinContext, Transaction.Status.COMMIT);
+
+ // exit the result iteration loop as there are no more results left. using this external control
+ // because of the above commit. some graphs may open a new transaction on the call to
+ // hasNext()
+ hasMore = false;
+ }
+ } catch (Exception ex) {
+ // a frame may use a Bytebuf which is a countable release - if it does not get written
+ // downstream it needs to be released here
+ if (frame != null) frame.tryRelease();
+ throw ex;
+ }
+
+ if (!moreInIterator) iterateComplete(gremlinContext, itty);
+
+ // the flush is called after the commit has potentially occurred. in this way, if a commit was
+ // required then it will be 100% complete before the client receives it. the "frame" at this point
+ // should have completely detached objects from the transaction (i.e. serialization has occurred)
+ // so a new one should not be opened on the flush down the netty pipeline
+ gremlinContext.writeAndFlush(code, frame);
+ }
+ } else {
+ // don't keep triggering this warning over and over again for the same request
+ if (!warnOnce) {
+ logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on {} - writing will continue once client has caught up", msg);
+ warnOnce = true;
+ }
+
+ // since the client is lagging we can hold here for a period of time for the client to catch up.
+ // this isn't blocking the IO thread - just a worker.
+ TimeUnit.MILLISECONDS.sleep(10);
+ }
+ }
+ }
+
+ /**
+ * If {@link Bytecode} is detected to contain a "graph operation" then it gets processed by this method.
+ */
+ protected void handleGraphOperation(final Context gremlinContext, final Bytecode bytecode, final Graph graph) throws Exception {
+ final RequestMessage msg = gremlinContext.getRequestMessage();
+ if (graph.features().graph().supportsTransactions()) {
+ if (bytecode.equals(Bytecode.TX_COMMIT) || bytecode.equals(Bytecode.TX_ROLLBACK)) {
+ final boolean commit = bytecode.equals(Bytecode.TX_COMMIT);
+ closeTransaction(gremlinContext, commit ? Transaction.Status.COMMIT : Transaction.Status.ROLLBACK);
+
+ // write back a no-op for success
+ final Map<String, Object> attributes = generateStatusAttributes(gremlinContext,
+ ResponseStatusCode.NO_CONTENT, Collections.emptyIterator());
+ gremlinContext.writeAndFlush(ResponseMessage.build(msg)
+ .code(ResponseStatusCode.NO_CONTENT)
+ .statusAttributes(attributes)
+ .create());
+ } else {
+ throw new IllegalStateException(String.format(
+ "Bytecode in request is not a recognized graph operation: %s", bytecode.toString()));
+ }
+ }
+ }
+
+ /**
+ * Called when iteration within {@link #handleIterator(Context, Iterator)} is on its final pass and the final
+ * frame is about to be sent back to the client. This method only gets called on successful iteration of the
+ * entire result.
+ */
+ protected void iterateComplete(final Context gremlinContext, final Iterator<?> itty) {
+ // do nothing by default
+ }
+
+ /**
+ * Generates response status meta-data to put on a {@link ResponseMessage}.
+ *
+ * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+ * this method
+ */
+ protected Map<String, Object> generateStatusAttributes(final Context gremlinContext,
+ final ResponseStatusCode code, final Iterator<?> itty) {
+ // only return server metadata on the last message
+ if (itty.hasNext()) return Collections.emptyMap();
+
+ final Map<String, Object> metaData = new HashMap<>();
+ metaData.put(Tokens.ARGS_HOST, gremlinContext.getChannelHandlerContext().channel().remoteAddress().toString());
+
+ return metaData;
+ }
+
+ /**
+ * Generates response result meta-data to put on a {@link ResponseMessage}.
+ *
+ * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
+ * this method
+ */
+ protected Map<String, Object> generateResponseMetaData(final Context gremlinContext,
+ final ResponseStatusCode code, final Iterator<?> itty) {
+ return Collections.emptyMap();
+ }
+
+ protected Frame makeFrame(final Context gremlinContext, final List<Object> aggregate,
+ final ResponseStatusCode code, final Iterator<?> itty) throws Exception {
+ final RequestMessage msg = gremlinContext.getRequestMessage();
+ final ChannelHandlerContext nettyContext = gremlinContext.getChannelHandlerContext();
+ final MessageSerializer serializer = nettyContext.channel().attr(StateKey.SERIALIZER).get();
+ final boolean useBinary = nettyContext.channel().attr(StateKey.USE_BINARY).get();
+
+ final Map<String, Object> responseMetaData = generateResponseMetaData(gremlinContext, code, itty);
+ final Map<String, Object> statusAttributes = generateStatusAttributes(gremlinContext, code, itty);
+ try {
+ if (useBinary) {
+ return new Frame(serializer.serializeResponseAsBinary(ResponseMessage.build(msg)
+ .code(code)
+ .statusAttributes(statusAttributes)
+ .responseMetaData(responseMetaData)
+ .result(aggregate).create(), nettyContext.alloc()));
+ } else {
+ // the expectation is that the GremlinTextRequestDecoder will have placed a MessageTextSerializer
+ // instance on the channel.
+ final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
+ return new Frame(textSerializer.serializeResponseAsString(ResponseMessage.build(msg)
+ .code(code)
+ .statusAttributes(statusAttributes)
+ .responseMetaData(responseMetaData)
+ .result(aggregate).create()));
+ }
+ } catch (Exception ex) {
+ logger.warn("The result [{}] in the request {} could not be serialized and returned.", aggregate, msg.getRequestId(), ex);
+ final String errorMessage = String.format("Error during serialization: %s", ExceptionHelper.getMessageFromExceptionOrCause(ex));
+ final ResponseMessage error = ResponseMessage.build(msg.getRequestId())
+ .statusMessage(errorMessage)
+ .statusAttributeException(ex)
+ .code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create();
+ gremlinContext.writeAndFlush(error);
+ throw ex;
+ }
+ }
+
+ /**
+ * Called right before a transaction starts within {@link #run()}.
+ */
+ protected void startTransaction(final Context gremlinContext) {
+ // check if transactions are open and rollback first to ensure a fresh start.
+ graphManager.rollbackAll();
+ }
+
+ /**
+ * Close the transaction without a {@link Context} which supplies {@code null} to that argument for
+ * {@link #closeTransaction(Context, Transaction.Status)}. This method is idempotent.
+ */
+ protected void closeTransaction(final Transaction.Status status) {
+ closeTransaction(null, status);
+ }
+
+ /**
+ * Tries to close the transaction but will catch exceptions and log them. This method is idempotent.
+ */
+ protected void closeTransactionSafely(final Transaction.Status status) {
+ closeTransactionSafely(null, status);
+ }
+
+ /**
+ * Tries to close the transaction but will catch exceptions and log them. This method is idempotent.
+ */
+ protected void closeTransactionSafely(final Context gremlinContext, final Transaction.Status status) {
+ try {
+ closeTransaction(gremlinContext, status);
+ } catch (Exception ex) {
+ logger.error("Failed to close transaction", ex);
+ }
+ }
+
+ /**
+ * Closes a transaction with commit or rollback. Strict transaction management settings are observed when
+ * configured as such in {@link Settings#strictTransactionManagement} and when aliases are present on the
+ * request in the current {@link Context}. If the supplied {@link Context} is {@code null} then "strict" is
+ * bypassed so this form must be called with care. Bypassing is often useful to ensure that all transactions
+ * are cleaned up when multiple graphs are referenced. Prefer calling {@link #closeTransaction(Transaction.Status)}
+ * in this case instead. This method is idempotent.
+ */
+ protected void closeTransaction(final Context gremlinContext, final Transaction.Status status) {
+ if (status != Transaction.Status.COMMIT && status != Transaction.Status.ROLLBACK)
+ throw new IllegalStateException(String.format("Transaction.Status not supported: %s", status));
+
+ final boolean commit = status == Transaction.Status.COMMIT;
+ final boolean strict = gremlinContext != null && gremlinContext.getSettings().strictTransactionManagement;
+
+ if (strict) {
+ if (commit)
+ graphManager.commit(new HashSet<>(aliasesUsedByRexster));
+ else
+ graphManager.rollback(new HashSet<>(aliasesUsedByRexster));
+ } else {
+ if (commit)
+ graphManager.commitAll();
+ else
+ graphManager.rollbackAll();
+ }
+ }
+
+ private void processAuditLog(final Settings settings, final ChannelHandlerContext ctx, final Object gremlinToExecute) {
+ if (settings.enableAuditLog) {
+ AuthenticatedUser user = ctx.channel().attr(StateKey.AUTHENTICATED_USER).get();
+ if (null == user) { // This is expected when using the AllowAllAuthenticator
+ user = AuthenticatedUser.ANONYMOUS_USER;
+ }
+ String address = ctx.channel().remoteAddress().toString();
+ if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
+ auditLogger.info("User {} with address {} requested: {}", user.getName(), address, gremlinToExecute);
+ }
+
+ if (settings.authentication.enableAuditLog) {
+ String address = ctx.channel().remoteAddress().toString();
+ if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
+ auditLogger.info("User with address {} requested: {}", address, gremlinToExecute);
+ }
+ }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
index a282874..a050ab0 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthenticationHandler.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +55,16 @@ public class HttpBasicAuthenticationHandler extends AbstractAuthenticationHandle
private final Base64.Decoder decoder = Base64.getUrlDecoder();
+ /**
+ * @deprecated As of release 3.5.0, replaced by {@link #HttpBasicAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+ */
+ @Deprecated
public HttpBasicAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
- super(authenticator);
+ this(authenticator, null, settings);
+ }
+
+ public HttpBasicAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+ super(authenticator, authorizer);
this.settings = settings;
}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
new file mode 100644
index 0000000..539c16b
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiRexster.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
+import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngineManager;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.Bindings;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED;
+
+/**
+ * A {@link Rexster} implementation that queues tasks given to it and executes them in a serial fashion within the
+ * same thread which thus allows multiple tasks to be executed in the same transaction.
+ */
+public class MultiRexster extends AbstractRexster {
+ private static final Logger logger = LoggerFactory.getLogger(MultiRexster.class);
+ protected final BlockingQueue<Context> queue = new LinkedBlockingQueue<>();
+ private ScheduledFuture<?> requestCancelFuture;
+ private Bindings bindings;
+ private final AtomicBoolean ending = new AtomicBoolean(false);
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final GremlinScriptEngineManager scriptEngineManager;
+
+ MultiRexster(final Context gremlinContext, final String sessionId,
+ final ConcurrentMap<String, Rexster> sessions) {
+ super(gremlinContext, sessionId, false, sessions);
+
+ // using a global function cache is cheaper than creating a new on per session especially if you have to
+ // create a lot of sessions. it will generate a ton of throw-away objects. mostly keeping the option open
+ // to not use it to preserve the ability to use the old functionality if wanted or if there is some specific
+ // use case with sessions that needs it. if we wanted this could eventually become a per-request option
+ // so that the client could control it as necessary and get scriptengine isolation if they need it.
+ if (gremlinContext.getSettings().useCommonEngineForSessions)
+ scriptEngineManager = gremlinContext.getGremlinExecutor().getScriptEngineManager();
+ else
+ scriptEngineManager = initializeGremlinExecutor(gremlinContext).getScriptEngineManager();
+
+ scheduledExecutorService = gremlinContext.getScheduledExecutorService();
+ addTask(gremlinContext);
+ }
+
+ @Override
+ public GremlinScriptEngine getScriptEngine(final Context gremlinContext, final String language) {
+ return scriptEngineManager.getEngineByName(language);
+ }
+
+ @Override
+ public boolean acceptingTasks() {
+ return !ending.get();
+ }
+
+ @Override
+ public void addTask(final Context gremlinContext) {
+ // todo: explicitly reject request???
+ if (acceptingTasks())
+ queue.offer(gremlinContext);
+ }
+
+ @Override
+ public void run() {
+ // there must be one item in the queue at least since addTask() gets called before the worker
+ // is ever started
+ Context gremlinContext = queue.poll();
+ if (null == gremlinContext)
+ throw new IllegalStateException(String.format("Worker has no initial context for session: %s", getSessionId()));
+
+ try {
+ startTransaction(gremlinContext);
+ try {
+ while (true) {
+ // schedule timeout for the current request from the queue
+ final long seto = gremlinContext.getRequestTimeout();
+ requestCancelFuture = scheduledExecutorService.schedule(
+ () -> this.triggerTimeout(seto, false),
+ seto, TimeUnit.MILLISECONDS);
+
+ process(gremlinContext);
+
+ // work is done within the timeout period so cancel it
+ cancelRequestTimeout();
+
+ gremlinContext = queue.take();
+ }
+ } catch (Exception ex) {
+ // stop accepting requests on this worker since it is heading to close()
+ ending.set(true);
+
+ // the current context gets its exception handled...
+ handleException(gremlinContext, ex);
+ }
+ } catch (RexsterException rexex) {
+ // remaining work items in the queue are ignored since this worker is closing. must send
+ // back some sort of response to satisfy the client. writeAndFlush code is different than
+ // the ResponseMessage as we don't want the message to be "final" for the Context. that
+ // status must be reserved for the message that caused the error
+ for (Context gctx : queue) {
+ gctx.writeAndFlush(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(gctx.getRequestMessage())
+ .code(ResponseStatusCode.SERVER_ERROR)
+ .statusMessage(String.format(
+ "An earlier request [%s] failed prior to this one having a chance to execute",
+ gremlinContext.getRequestMessage().getRequestId())).create());
+ }
+
+ // exception should trigger a rollback in the session. a more focused rollback may have occurred
+ // during process() and the related result iteration IF transaction management was enabled on
+ // the request
+ closeTransactionSafely(Transaction.Status.ROLLBACK);
+
+ logger.warn(rexex.getMessage(), rexex);
+ gremlinContext.writeAndFlush(rexex.getResponseMessage());
+ } finally {
+ close();
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ ending.set(true);
+ cancelRequestTimeout();
+ super.close();
+ logger.info("Session {} closed", getSessionId());
+ }
+
+ private void cancelRequestTimeout() {
+ if (requestCancelFuture != null && !requestCancelFuture.isDone())
+ requestCancelFuture.cancel(true);
+ }
+
+ @Override
+ protected Bindings getWorkerBindings() throws RexsterException {
+ if (null == bindings)
+ bindings = super.getWorkerBindings();
+ return this.bindings;
+ }
+
+ protected GremlinExecutor initializeGremlinExecutor(final Context gremlinContext) {
+ final Settings settings = gremlinContext.getSettings();
+ final ExecutorService executor = gremlinContext.getGremlinExecutor().getExecutorService();
+ final boolean useGlobalFunctionCache = settings.useGlobalFunctionCacheForSessions;
+
+ // these initial settings don't matter so much as we don't really execute things through the
+ // GremlinExecutor directly. Just doing all this setup to make GremlinExecutor do the work of
+ // rigging up the GremlinScriptEngineManager.
+ final GremlinExecutor.Builder gremlinExecutorBuilder = GremlinExecutor.build()
+ .evaluationTimeout(settings.getEvaluationTimeout())
+ .executorService(executor)
+ .globalBindings(graphManager.getAsBindings())
+ .scheduledExecutorService(scheduledExecutorService);
+
+ settings.scriptEngines.forEach((k, v) -> {
+ // use plugins if they are present
+ if (!v.plugins.isEmpty()) {
+ // make sure that server related classes are available at init. the LifeCycleHook stuff will be
+ // added explicitly via configuration using GremlinServerGremlinModule in the yaml. need to override
+ // scriptengine settings with SessionOpProcessor specific ones as the processing for sessions is
+ // different and a global setting may not make sense for a session
+ if (v.plugins.containsKey(GroovyCompilerGremlinPlugin.class.getName())) {
+ v.plugins.get(GroovyCompilerGremlinPlugin.class.getName()).put(CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, useGlobalFunctionCache);
+ } else {
+ final Map<String,Object> pluginConf = new HashMap<>();
+ pluginConf.put(CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, useGlobalFunctionCache);
+ v.plugins.put(GroovyCompilerGremlinPlugin.class.getName(), pluginConf);
+ }
+
+ gremlinExecutorBuilder.addPlugins(k, v.plugins);
+ }
+ });
+
+ return gremlinExecutorBuilder.create();
+ }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
new file mode 100644
index 0000000..70869dd
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Rexster.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.channel.Channel;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.server.Context;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * The {@code Rexster} interface is essentially a form of "worker", named for Gremlin's trusty canine robot friend
+ * who represents the "server" aspect of TinkerPop. In the most generic sense, {@code Rexster} implementations take
+ * tasks, a Gremlin Server {@link Context} and process them, which typically means "execute some Gremlin".
+ * Implementations have a fair amount of flexibility in terms of how they go about doing this, but in most cases,
+ * the {@link SingleRexster} and the {@link MultiRexster} should handle most cases quite well and are extensible for
+ * providers.
+ */
+public interface Rexster extends Runnable {
+ /**
+ * Adds a task for Rexster to complete.
+ */
+ void addTask(final Context gremlinContext);
+
+ /**
+ * Sets a reference to the job that will cancel this Rexster if it exceeds its timeout period.
+ */
+ void setSessionCancelFuture(final ScheduledFuture<?> f);
+
+ /**
+ * Sets a reference to the job itself that is running this Rexster.
+ */
+ void setSessionFuture(final Future<?> f);
+
+ /**
+ * Provides a general way to tell Rexster that it has exceeded some timeout condition.
+ */
+ void triggerTimeout(final long timeout, boolean causedBySession);
+
+ /**
+ * Determines if the supplied {@code Channel} object is the same as the one bound to the {@code Session}.
+ */
+ boolean isBoundTo(final Channel channel);
+
+ /**
+ * Determins if this Rexster can accept additional tasks or not.
+ */
+ boolean acceptingTasks();
+
+ public class RexsterException extends Exception {
+ private final ResponseMessage responseMessage;
+
+ public RexsterException(final String message, final ResponseMessage responseMessage) {
+ super(message);
+ this.responseMessage = responseMessage;
+ }
+
+ public RexsterException(final String message, final Throwable cause, final ResponseMessage responseMessage) {
+ super(message, cause);
+ this.responseMessage = responseMessage;
+ }
+
+ public ResponseMessage getResponseMessage() {
+ return this.responseMessage;
+ }
+ }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
index d58553d..31dabd0 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAndHttpBasicAuthenticationHandler.java
@@ -21,14 +21,14 @@ package org.apache.tinkerpop.gremlin.server.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpMessage;
import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
import org.apache.tinkerpop.gremlin.server.Settings;
-import org.apache.tinkerpop.gremlin.server.handler.HttpBasicAuthenticationHandler;
-import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler;
-import org.apache.tinkerpop.gremlin.server.handler.WebSocketHandlerUtil;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
+import static org.apache.tinkerpop.gremlin.server.AbstractChannelizer.PIPELINE_AUTHORIZER;
import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_AUTHENTICATOR;
/**
@@ -39,18 +39,33 @@ public class SaslAndHttpBasicAuthenticationHandler extends SaslAuthenticationHan
private final String HTTP_AUTH = "http-authentication";
+ /**
+ * @deprecated As of release 3.5.0, replaced by {@link #SaslAndHttpBasicAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+ */
+ @Deprecated
public SaslAndHttpBasicAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
- super(authenticator, settings);
+ this(authenticator, null, settings);
+ }
+
+ public SaslAndHttpBasicAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+ super(authenticator, authorizer, settings);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object obj) throws Exception {
if (obj instanceof HttpMessage && !WebSocketHandlerUtil.isWebSocket((HttpMessage)obj)) {
- ChannelPipeline pipeline = ctx.pipeline();
+ final ChannelPipeline pipeline = ctx.pipeline();
if (null != pipeline.get(HTTP_AUTH)) {
pipeline.remove(HTTP_AUTH);
}
pipeline.addAfter(PIPELINE_AUTHENTICATOR, HTTP_AUTH, new HttpBasicAuthenticationHandler(authenticator, this.settings));
+
+ if (authorizer != null) {
+ final ChannelInboundHandlerAdapter authorizationHandler = new HttpBasicAuthorizationHandler(authorizer);
+ pipeline.remove(PIPELINE_AUTHORIZER);
+ pipeline.addAfter(HTTP_AUTH, PIPELINE_AUTHORIZER, authorizationHandler);
+ }
+
ctx.fireChannelRead(obj);
} else {
super.channelRead(ctx, obj);
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
index e8216a6..55da853 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
@@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
+import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,8 +60,16 @@ public class SaslAuthenticationHandler extends AbstractAuthenticationHandler {
protected final Settings settings;
+ /**
+ * @deprecated As of release 3.5.0, replaced by {@link #SaslAuthenticationHandler(Authenticator, Authorizer, Settings)}.
+ */
+ @Deprecated
public SaslAuthenticationHandler(final Authenticator authenticator, final Settings settings) {
- super(authenticator);
+ this(authenticator, null, settings);
+ }
+
+ public SaslAuthenticationHandler(final Authenticator authenticator, final Authorizer authorizer, final Settings settings) {
+ super(authenticator, authorizer);
this.settings = settings;
}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java
new file mode 100644
index 0000000..05d5dff
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SingleRexster.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+
+/**
+ * A simple {@link Rexster} implementation that accepts one request, processes it and exits.
+ */
+public class SingleRexster extends AbstractRexster {
+ private static final Logger logger = LoggerFactory.getLogger(SingleRexster.class);
+ protected final Context gremlinContext;
+
+ SingleRexster(final Context gremlinContext, final String sessionId,
+ final ConcurrentMap<String, Rexster> sessions) {
+ super(gremlinContext, sessionId,true, sessions);
+ this.gremlinContext = gremlinContext;
+ }
+
+ /**
+ * The {@code SingleWorker} can only process one request so the initial construction of it already has the
+ * request in it and no more can be added, therefore this method always return {@code false}.
+ */
+ @Override
+ public boolean acceptingTasks() {
+ return false;
+ }
+
+ @Override
+ public void addTask(final Context gremlinContext) {
+ throw new UnsupportedOperationException("SingleWorker doesn't accept tasks beyond the one provided to the constructor");
+ }
+
+ @Override
+ public void run() {
+ try {
+ startTransaction(gremlinContext);
+ process(gremlinContext);
+ } catch (RexsterException we) {
+ logger.warn(we.getMessage(), we);
+
+ // should have already rolledback - this is a safety valve
+ closeTransactionSafely(gremlinContext, Transaction.Status.ROLLBACK);
+
+ gremlinContext.writeAndFlush(we.getResponseMessage());
+ } finally {
+ close();
+ }
+ }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
new file mode 100644
index 0000000..7483062
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnifiedHandler.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.process.traversal.Operator;
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.Pop;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.server.Channelizer;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.Rexster.RexsterException;
+import org.apache.tinkerpop.gremlin.structure.Column;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Handler for websockets to be used with the {@link UnifiedChannelizer}.
+ */
+@ChannelHandler.Sharable
+public class UnifiedHandler extends SimpleChannelInboundHandler<RequestMessage> {
+ private static final Logger logger = LoggerFactory.getLogger(UnifiedHandler.class);
+
+ protected final Settings settings;
+ protected final GraphManager graphManager;
+ protected final GremlinExecutor gremlinExecutor;
+ protected final ScheduledExecutorService scheduledExecutorService;
+ protected final ExecutorService executorService;
+ protected final Channelizer channelizer;
+
+ protected final ConcurrentMap<String, Rexster> sessions = new ConcurrentHashMap<>();
+
+ /**
+ * This may or may not be the full set of invalid binding keys. It is dependent on the static imports made to
+ * Gremlin Server. This should get rid of the worst offenders though and provide a good message back to the
+ * calling client.
+ * <p/>
+ * Use of {@code toUpperCase()} on the accessor values of {@link T} solves an issue where the {@code ScriptEngine}
+ * ignores private scope on {@link T} and imports static fields.
+ */
+ protected static final Set<String> INVALID_BINDINGS_KEYS = new HashSet<>();
+
+ static {
+ INVALID_BINDINGS_KEYS.addAll(Arrays.asList(
+ T.id.name(), T.key.name(),
+ T.label.name(), T.value.name(),
+ T.id.getAccessor(), T.key.getAccessor(),
+ T.label.getAccessor(), T.value.getAccessor(),
+ T.id.getAccessor().toUpperCase(), T.key.getAccessor().toUpperCase(),
+ T.label.getAccessor().toUpperCase(), T.value.getAccessor().toUpperCase()));
+
+ for (Column enumItem : Column.values()) {
+ INVALID_BINDINGS_KEYS.add(enumItem.name());
+ }
+
+ for (Order enumItem : Order.values()) {
+ INVALID_BINDINGS_KEYS.add(enumItem.name());
+ }
+
+ for (Operator enumItem : Operator.values()) {
+ INVALID_BINDINGS_KEYS.add(enumItem.name());
+ }
+
+ for (Scope enumItem : Scope.values()) {
+ INVALID_BINDINGS_KEYS.add(enumItem.name());
+ }
+
+ for (Pop enumItem : Pop.values()) {
+ INVALID_BINDINGS_KEYS.add(enumItem.name());
+ }
+ }
+
+ public UnifiedHandler(final Settings settings, final GraphManager graphManager,
+ final GremlinExecutor gremlinExecutor,
+ final ScheduledExecutorService scheduledExecutorService,
+ final Channelizer channelizer) {
+ this.settings = settings;
+ this.graphManager = graphManager;
+ this.gremlinExecutor = gremlinExecutor;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.executorService = gremlinExecutor.getExecutorService();
+ this.channelizer = channelizer;
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final RequestMessage msg) throws Exception {
+ try {
+ try {
+ validateRequest(msg, graphManager);
+ } catch (RexsterException we) {
+ ctx.writeAndFlush(we.getResponseMessage());
+ return;
+ }
+
+ final Optional<String> optSession = msg.optionalArgs(Tokens.ARGS_SESSION);
+ final String sessionId = optSession.orElse(UUID.randomUUID().toString());
+
+ // still using GremlinExecutor here in the Context so that this object doesn't need to immediately
+ // change, but also because GremlinExecutor/ScriptEngine config is all rigged up into the server nicely
+ // right now. when the UnifiedChannelizer is "ready" we can factor out the GremlinExecutor
+ final Context gremlinContext = new Context(msg, ctx, settings, graphManager,
+ gremlinExecutor, scheduledExecutorService);
+
+ if (sessions.containsKey(sessionId)) {
+ final Rexster rexster = sessions.get(sessionId);
+
+ // check if the session is still accepting requests - if not block further requests
+ if (!rexster.acceptingTasks()) {
+ final String sessionClosedMessage = String.format(
+ "Session %s is no longer accepting requests as it has been closed", sessionId);
+ final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+ .statusMessage(sessionClosedMessage).create();
+ ctx.writeAndFlush(response);
+ return;
+ }
+
+ // check if the session is bound to this channel, thus one client per session
+ if (!rexster.isBoundTo(gremlinContext.getChannelHandlerContext().channel())) {
+ final String sessionClosedMessage = String.format("Session %s is not bound to the connecting client", sessionId);
+ final ResponseMessage response = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+ .statusMessage(sessionClosedMessage).create();
+ ctx.writeAndFlush(response);
+ return;
+ }
+
+ rexster.addTask(gremlinContext);
+ } else {
+ final Rexster rexster = optSession.isPresent() ?
+ createMulti(gremlinContext, sessionId) : createSingle(gremlinContext, sessionId);
+ final Future<?> sessionFuture = executorService.submit(rexster);
+ rexster.setSessionFuture(sessionFuture);
+ sessions.put(sessionId, rexster);
+
+ // determine the max session life. for multi that's going to be "session life" and for single that
+ // will be the span of the request timeout
+ final long seto = gremlinContext.getRequestTimeout();
+ final long sessionLife = optSession.isPresent() ? settings.sessionLifetimeTimeout : seto;
+
+ // if timeout is enabled when greater than zero
+ if (seto > 0) {
+ final ScheduledFuture<?> sessionCancelFuture =
+ scheduledExecutorService.schedule(
+ () -> rexster.triggerTimeout(sessionLife, optSession.isPresent()),
+ sessionLife, TimeUnit.MILLISECONDS);
+ rexster.setSessionCancelFuture(sessionCancelFuture);
+ }
+ }
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+
+ protected void validateRequest(final RequestMessage message, final GraphManager graphManager) throws RexsterException {
+ if (!message.optionalArgs(Tokens.ARGS_GREMLIN).isPresent()) {
+ final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", message.getOp(), Tokens.ARGS_GREMLIN);
+ throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+ }
+
+ if (message.optionalArgs(Tokens.ARGS_BINDINGS).isPresent()) {
+ final Map bindings = (Map) message.getArgs().get(Tokens.ARGS_BINDINGS);
+ if (IteratorUtils.anyMatch(bindings.keySet().iterator(), k -> null == k || !(k instanceof String))) {
+ final String msg = String.format("The [%s] message is using one or more invalid binding keys - they must be of type String and cannot be null", Tokens.OPS_EVAL);
+ throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+ }
+
+ final Set<String> badBindings = IteratorUtils.set(IteratorUtils.<String>filter(bindings.keySet().iterator(), INVALID_BINDINGS_KEYS::contains));
+ if (!badBindings.isEmpty()) {
+ final String msg = String.format("The [%s] message supplies one or more invalid parameters key of [%s] - these are reserved names.", Tokens.OPS_EVAL, badBindings);
+ throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+ }
+
+ // ignore control bindings that get passed in with the "#jsr223" prefix - those aren't used in compilation
+ if (IteratorUtils.count(IteratorUtils.filter(bindings.keySet().iterator(), k -> !k.toString().startsWith("#jsr223"))) > settings.maxParameters) {
+ final String msg = String.format("The [%s] message contains %s bindings which is more than is allowed by the server %s configuration",
+ Tokens.OPS_EVAL, bindings.size(), settings.maxParameters);
+ throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+ }
+ }
+
+ // bytecode must have an alias defined
+ if (message.getOp().equals(Tokens.OPS_BYTECODE)) {
+ final Optional<Map<String, String>> aliases = message.optionalArgs(Tokens.ARGS_ALIASES);
+ if (!aliases.isPresent()) {
+ final String msg = String.format("A message with [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
+ throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+ }
+
+ if (aliases.get().size() != 1 || !aliases.get().containsKey(Tokens.VAL_TRAVERSAL_SOURCE_ALIAS)) {
+ final String msg = String.format("A message with [%s] op code requires the [%s] argument to be a Map containing one alias assignment named '%s'.",
+ Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES, Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+ throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+ }
+
+ final String traversalSourceBindingForAlias = aliases.get().values().iterator().next();
+ if (!graphManager.getTraversalSourceNames().contains(traversalSourceBindingForAlias)) {
+ final String msg = String.format("The traversal source [%s] for alias [%s] is not configured on the server.", traversalSourceBindingForAlias, Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+ throw new RexsterException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+ }
+ }
+ }
+
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
+ // only need to handle this event if the idle monitor is on
+ if (!channelizer.supportsIdleMonitor()) return;
+
+ if (evt instanceof IdleStateEvent) {
+ final IdleStateEvent e = (IdleStateEvent) evt;
+
+ // if no requests (reader) then close, if no writes from server to client then ping. clients should
+ // periodically ping the server, but coming from this direction allows the server to kill channels that
+ // have dead clients on the other end
+ if (e.state() == IdleState.READER_IDLE) {
+ logger.info("Closing channel - client is disconnected after idle period of " + settings.idleConnectionTimeout + " " + ctx.channel().id().asShortText());
+ ctx.close();
+ } else if (e.state() == IdleState.WRITER_IDLE && settings.keepAliveInterval > 0) {
+ logger.info("Checking channel - sending ping to client after idle period of " + settings.keepAliveInterval + " " + ctx.channel().id().asShortText());
+ ctx.writeAndFlush(channelizer.createIdleDetectionMessage());
+ }
+ }
+ }
+
+ protected Rexster createSingle(final Context gremlinContext, final String sessionId) {
+ return new SingleRexster(gremlinContext, sessionId, sessions);
+ }
+
+ protected Rexster createMulti(final Context gremlinContext, final String sessionId) {
+ return new MultiRexster(gremlinContext, sessionId, sessions);
+ }
+
+ public boolean isActiveSession(final String sessionId) {
+ return sessions.containsKey(sessionId);
+ }
+
+ public int getActiveSessionCount() {
+ return sessions.size();
+ }
+}
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
index e4f79b4..a93e1c4 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
@@ -28,9 +28,9 @@ import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
import org.apache.tinkerpop.gremlin.server.channel.WsAndHttpChannelizer;
import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
+import static org.apache.tinkerpop.gremlin.server.AbstractChannelizer.PIPELINE_HTTP_AGGREGATOR;
import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_AUTHENTICATOR;
import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_REQUEST_HANDLER;
-import static org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer.PIPELINE_HTTP_RESPONSE_ENCODER;
/**
* A ChannelInboundHandlerAdapter for use with {@link WsAndHttpChannelizer} that toggles between WebSockets
@@ -66,11 +66,11 @@ public class WsAndHttpChannelizerHandler extends ChannelInboundHandlerAdapter {
pipeline.remove(PIPELINE_REQUEST_HANDLER);
final ChannelHandler authenticator = pipeline.get(PIPELINE_AUTHENTICATOR);
pipeline.remove(PIPELINE_AUTHENTICATOR);
- pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_AUTHENTICATOR, authenticator);
+ pipeline.addAfter(PIPELINE_HTTP_AGGREGATOR, PIPELINE_AUTHENTICATOR, authenticator);
pipeline.addAfter(PIPELINE_AUTHENTICATOR, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
} else {
pipeline.remove(PIPELINE_REQUEST_HANDLER);
- pipeline.addAfter(PIPELINE_HTTP_RESPONSE_ENCODER, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
+ pipeline.addAfter(PIPELINE_HTTP_AGGREGATOR, PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
}
}
ctx.fireChannelRead(obj);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
index 55f89e6..deb1734 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java
@@ -18,7 +18,14 @@
*/
package org.apache.tinkerpop.gremlin.server;
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizerIntegrateTest;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizerIntegrateTest;
+import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
+import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizerIntegrateTest;
import org.apache.tinkerpop.gremlin.server.op.OpLoader;
+import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
+import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -96,16 +103,28 @@ public abstract class AbstractGremlinServerIntegrationTest {
startServer(settings);
}
+ private boolean shouldTestUnified() {
+ // ignore all tests in the UnifiedChannelizerIntegrateTest package as they are already rigged to test
+ // over the various channelizer implementations
+ return Boolean.parseBoolean(System.getProperty("testUnified", "false")) &&
+ !this.getClass().getPackage().equals(UnifiedChannelizerIntegrateTest.class.getPackage());
+ }
+
public void startServer(final Settings settings) throws Exception {
if (null == settings) {
startServer();
} else {
- final Settings overridenSettings = overrideSettings(settings);
- ServerTestHelper.rewritePathsInGremlinServerSettings(overridenSettings);
+ final Settings oSettings = overrideSettings(settings);
+
+ if (shouldTestUnified()) {
+ oSettings.channelizer = UnifiedChannelizer.class.getName();
+ }
+
+ ServerTestHelper.rewritePathsInGremlinServerSettings(oSettings);
if (GREMLIN_SERVER_EPOLL) {
- overridenSettings.useEpollEventLoop = true;
+ oSettings.useEpollEventLoop = true;
}
- this.server = new GremlinServer(overridenSettings);
+ this.server = new GremlinServer(oSettings);
server.start().join();
}
}
@@ -119,6 +138,10 @@ public abstract class AbstractGremlinServerIntegrationTest {
overriddenSettings.useEpollEventLoop = true;
}
+ if (shouldTestUnified()) {
+ overriddenSettings.channelizer = UnifiedChannelizer.class.getName();
+ }
+
this.server = new GremlinServer(overriddenSettings);
server.start().join();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
index f8956e2..e685932 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
@@ -47,7 +47,8 @@ public class ContextTest {
private final ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
private final RequestMessage request = RequestMessage.build("test").create();
- private final Context context = new Context(request, ctx, null, null, null, null);
+ private final Settings settings = new Settings();
+ private final Context context = new Context(request, ctx, settings, null, null, null);
private final Log4jRecordingAppender recordingAppender = new Log4jRecordingAppender();
private Level originalLogLevel;
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 3c1d308..c11cad8 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -42,6 +42,7 @@ import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
import org.apache.tinkerpop.gremlin.server.handler.OpExecutorHandler;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -99,6 +100,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assume.assumeThat;
import static org.mockito.Mockito.verify;
/**
@@ -1645,16 +1647,60 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
final CompletableFuture<List<Result>> futureThird = third.get().all();
final CompletableFuture<List<Result>> futureFourth = fourth.get().all();
- assertFutureTimeout(futureFirst);
- assertFutureTimeout(futureSecond);
- assertFutureTimeout(futureThird);
- assertFutureTimeout(futureFourth);
+ // there is slightly different assertion logic with UnifiedChannelizer given differences in session
+ // behavior where UnfiedChannelizer sessions won't continue processing in the face of a timeout and
+ // a new session will need to be created
+ if (server.getServerGremlinExecutor().getSettings().channelizer.equals(UnifiedChannelizer.class.getName())) {
+ // first timesout and the rest get SERVER_ERROR
+ try {
+ futureFirst.get();
+ fail("Should have timed out");
+ } catch (Exception ex) {
+ final Throwable root = ExceptionUtils.getRootCause(ex);
+ assertThat(root, instanceOf(ResponseException.class));
+ assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
+ assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
+ }
+
+ assertFutureTimeoutUnderUnified(futureSecond);
+ assertFutureTimeoutUnderUnified(futureThird);
+ assertFutureTimeoutUnderUnified(futureFourth);
+ } else {
+ assertFutureTimeout(futureFirst);
+ assertFutureTimeout(futureSecond);
+ assertFutureTimeout(futureThird);
+ assertFutureTimeout(futureFourth);
+ }
}
} finally {
cluster.close();
}
}
+ private void assertFutureTimeoutUnderUnified(final CompletableFuture<List<Result>> f) {
+ try {
+ f.get();
+ fail("Should have timed out");
+ } catch (Exception ex) {
+ final Throwable root = ExceptionUtils.getRootCause(ex);
+ assertThat(root, instanceOf(ResponseException.class));
+ assertEquals(ResponseStatusCode.SERVER_ERROR, ((ResponseException) root).getResponseStatusCode());
+ assertThat(root.getMessage(), allOf(startsWith("An earlier request"), endsWith("failed prior to this one having a chance to execute")));
+ }
+ }
+
+ private void assertFutureTimeout(final CompletableFuture<List<Result>> f) {
+ try {
+ f.get();
+ fail("Should have timed out");
+ } catch (Exception ex) {
+ final Throwable root = ExceptionUtils.getRootCause(ex);
+ assertThat(root, instanceOf(ResponseException.class));
+ assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
+ assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
+ }
+ }
+
@Test
public void shouldCloseAllClientsOnCloseOfCluster() throws Exception {
final Cluster cluster = TestClientFactory.open();
@@ -1780,20 +1826,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
}
- private void assertFutureTimeout(final CompletableFuture<List<Result>> futureFirst) {
- try
- {
- futureFirst.get();
- fail("Should have timed out");
- }
- catch (Exception ex)
- {
- final Throwable root = ExceptionUtils.getRootCause(ex);
- assertThat(root, instanceOf(ResponseException.class));
- assertThat(root.getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 250 ms"));
- }
- }
-
@Test
public void shouldClusterReadFileFromResources() throws Exception {
final Cluster cluster = Cluster.open(TestClientFactory.RESOURCE_PATH);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
index a9d7228..e81f889 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogDeprecatedIntegrateTest.java
@@ -31,10 +31,10 @@ import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
-import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
import org.apache.tinkerpop.gremlin.server.auth.Krb5Authenticator;
import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
@@ -81,7 +81,7 @@ public class GremlinServerAuditLogDeprecatedIntegrateTest extends AbstractGremli
rootLogger.addAppender(recordingAppender);
try {
- final String moduleBaseDir = System.getProperty("basedir");
+ final String moduleBaseDir = System.getProperty("basedir", ".");
final String authConfigName = moduleBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
System.setProperty("java.security.auth.login.config", authConfigName);
kdcServer = new KdcFixture(moduleBaseDir);
@@ -139,6 +139,7 @@ public class GremlinServerAuditLogDeprecatedIntegrateTest extends AbstractGremli
settings.host = "localhost";
settings.channelizer = HttpChannelizer.class.getName();
authSettings.authenticator = SimpleAuthenticator.class.getName();
+ authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
break;
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
index 69bb974..28ad1e1 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuditLogIntegrateTest.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
import org.apache.tinkerpop.gremlin.server.auth.Krb5Authenticator;
import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
@@ -81,7 +82,7 @@ public class GremlinServerAuditLogIntegrateTest extends AbstractGremlinServerInt
rootLogger.addAppender(recordingAppender);
try {
- final String moduleBaseDir = System.getProperty("basedir");
+ final String moduleBaseDir = System.getProperty("basedir", ".");
final String authConfigName = moduleBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
System.setProperty("java.security.auth.login.config", authConfigName);
kdcServer = new KdcFixture(moduleBaseDir);
@@ -139,6 +140,7 @@ public class GremlinServerAuditLogIntegrateTest extends AbstractGremlinServerInt
settings.host = "localhost";
settings.channelizer = HttpChannelizer.class.getName();
authSettings.authenticator = SimpleAuthenticator.class.getName();
+ authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
break;
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
index 4ec467e..7d10a3a 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthKrb5IntegrateTest.java
@@ -65,7 +65,7 @@ public class GremlinServerAuthKrb5IntegrateTest extends AbstractGremlinServerInt
handlerLogger.setLevel(Level.OFF);
try {
- final String projectBaseDir = System.getProperty("basedir");
+ final String projectBaseDir = System.getProperty("basedir", ".");
final String authConfigName = projectBaseDir + "/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-console-jaas.conf";
System.setProperty("java.security.auth.login.config", authConfigName);
kdcServer = new KdcFixture(projectBaseDir);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
index 5614123..00eacda 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthzIntegrateTest.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.server.auth.AllowAllAuthenticator;
import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
import org.apache.tinkerpop.gremlin.server.authz.AllowListAuthorizer;
import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import org.apache.tinkerpop.gremlin.util.function.Lambda;
@@ -116,10 +117,12 @@ public class GremlinServerAuthzIntegrateTest extends AbstractGremlinServerIntegr
case "shouldFailAuthorizeWithHttpTransport":
case "shouldKeepAuthorizingWithHttpTransport":
settings.channelizer = HttpChannelizer.class.getName();
+ authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
break;
case "shouldAuthorizeWithAllowAllAuthenticatorAndHttpTransport":
settings.channelizer = HttpChannelizer.class.getName();
authSettings.authenticator = AllowAllAuthenticator.class.getName();
+ authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
authSettings.config = null;
final String fileHttp = Objects.requireNonNull(getClass().getClassLoader().getResource(yamlHttpName)).getFile();
authzSettings.config.put(AllowListAuthorizer.KEY_AUTHORIZATION_ALLOWLIST, fileHttp);
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
index fadc5e2..05cd939 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
@@ -36,6 +36,7 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
@@ -133,6 +134,7 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra
private void configureForAuthentication(final Settings settings) {
final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
authSettings.authenticator = SimpleAuthenticator.class.getName();
+ authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
// use a credentials graph with two users in it: stephen/password and marko/rainbow-dash
final Map<String,Object> authConfig = new HashMap<>();
@@ -147,7 +149,7 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra
authSettings.authenticator = SimpleAuthenticator.class.getName();
//Add basic auth handler to make sure the reflection code path works.
- authSettings.authenticationHandler = HttpBasicAuthenticationHandler.class.getName();
+ authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
// use a credentials graph with two users in it: stephen/password and marko/rainbow-dash
final Map<String,Object> authConfig = new HashMap<>();
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 4f0922e..270bed2 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.SimpleSandboxExtension;
import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
+import org.apache.tinkerpop.gremlin.server.handler.UnifiedHandler;
import org.apache.tinkerpop.gremlin.structure.RemoteGraph;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -58,7 +59,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -80,6 +80,7 @@ import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalS
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.AllOf.allOf;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
import static org.hamcrest.core.StringStartsWith.startsWith;
@@ -115,9 +116,11 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
if (name.getMethodName().equals("shouldPingChannelIfClientDies") ||
name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
- final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
- previousLogLevel = webSocketClientHandlerLogger.getLevel();
- webSocketClientHandlerLogger.setLevel(Level.INFO);
+ final org.apache.log4j.Logger opSelectorHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
+ final org.apache.log4j.Logger unifiedHandlerLogger = org.apache.log4j.Logger.getLogger(UnifiedHandler.class);
+ previousLogLevel = opSelectorHandlerLogger.getLevel();
+ opSelectorHandlerLogger.setLevel(Level.INFO);
+ unifiedHandlerLogger.setLevel(Level.INFO);
}
rootLogger.addAppender(recordingAppender);
@@ -129,8 +132,10 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
if (name.getMethodName().equals("shouldPingChannelIfClientDies")||
name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
- final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
- webSocketClientHandlerLogger.setLevel(previousLogLevel);
+ final org.apache.log4j.Logger opSelectorHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
+ opSelectorHandlerLogger.setLevel(previousLogLevel);
+ final org.apache.log4j.Logger unifiedHandlerLogger = org.apache.log4j.Logger.getLogger(UnifiedHandler.class);
+ unifiedHandlerLogger.setLevel(previousLogLevel);
}
rootLogger.removeAppender(recordingAppender);
@@ -145,6 +150,8 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
switch (nameOfTest) {
case "shouldProvideBetterExceptionForMethodCodeTooLarge":
settings.maxContentLength = 4096000;
+
+ // OpProcessor setting
final Settings.ProcessorSettings processorSettingsBig = new Settings.ProcessorSettings();
processorSettingsBig.className = StandardOpProcessor.class.getName();
processorSettingsBig.config = new HashMap<String,Object>() {{
@@ -152,6 +159,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
}};
settings.processors.clear();
settings.processors.add(processorSettingsBig);
+
+ // Unified setting
+ settings.maxParameters = Integer.MAX_VALUE;
break;
case "shouldRespectHighWaterMarkSettingAndSucceed":
settings.writeBufferHighWaterMark = 64;
@@ -182,6 +192,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
settings.scriptEngines.get("gremlin-groovy").config = getScriptEngineConfForBaseScript();
break;
case "shouldReturnInvalidRequestArgsWhenBindingCountExceedsAllowable":
+ // OpProcessor settings
final Settings.ProcessorSettings processorSettingsSmall = new Settings.ProcessorSettings();
processorSettingsSmall.className = StandardOpProcessor.class.getName();
processorSettingsSmall.config = new HashMap<String,Object>() {{
@@ -189,6 +200,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
}};
settings.processors.clear();
settings.processors.add(processorSettingsSmall);
+
+ // Unified settings
+ settings.maxParameters = 1;
break;
case "shouldTimeOutRemoteTraversal":
settings.evaluationTimeout = 500;
@@ -699,7 +713,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
public void shouldReceiveFailureTimeOutOnScriptEval() throws Exception {
try (SimpleClient client = TestClientFactory.createWebSocketClient()){
final List<ResponseMessage> responses = client.submit("Thread.sleep(3000);'some-stuff-that-should not return'");
- assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 1000 ms"));
+ assertThat(responses.get(0).getStatus().getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("1000 ms")));
// validate that we can still send messages to the server
assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
@@ -715,7 +729,7 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration
.addArg(Tokens.ARGS_GREMLIN, "Thread.sleep(3000);'some-stuff-that-should not return'")
.create();
final List<ResponseMessage> responses = client.submit(msg);
- assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 100 ms"));
+ assertThat(responses.get(0).getStatus().getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("100 ms")));
// validate that we can still send messages to the server
assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index 3dc58ae..fe9ff99 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.simple.SimpleClient;
+import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import org.junit.After;
@@ -94,11 +95,16 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
case "shouldHaveTheSessionTimeout":
case "shouldCloseSessionOnceOnRequest":
settings.processors.clear();
+
+ // OpProcessor setting
final Settings.ProcessorSettings processorSettings = new Settings.ProcessorSettings();
processorSettings.className = SessionOpProcessor.class.getCanonicalName();
processorSettings.config = new HashMap<>();
processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000L);
settings.processors.add(processorSettings);
+
+ // Unified setting
+ settings.sessionLifetimeTimeout = 3000L;
break;
case "shouldCloseSessionOnClientClose":
clearNeo4j(settings);
@@ -106,13 +112,27 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
case "shouldEnsureSessionBindingsAreThreadSafe":
settings.threadPoolWorker = 2;
break;
+ case "shouldUseGlobalFunctionCache":
+ // OpProcessor settings are good by default
+ // UnifiedHandler settings
+ settings.useCommonEngineForSessions = false;
+ settings.useGlobalFunctionCacheForSessions = true;
+
+ break;
case "shouldNotUseGlobalFunctionCache":
settings.processors.clear();
+
+ // OpProcessor settings
final Settings.ProcessorSettings processorSettingsForDisableFunctionCache = new Settings.ProcessorSettings();
processorSettingsForDisableFunctionCache.className = SessionOpProcessor.class.getCanonicalName();
processorSettingsForDisableFunctionCache.config = new HashMap<>();
processorSettingsForDisableFunctionCache.config.put(SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, false);
settings.processors.add(processorSettingsForDisableFunctionCache);
+
+ // UnifiedHandler settings
+ settings.useCommonEngineForSessions = false;
+ settings.useGlobalFunctionCacheForSessions = false;
+
break;
case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient":
case "shouldExecuteInSessionWithTransactionManagement":
@@ -124,6 +144,11 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
return settings;
}
+ private boolean isUsingUnifiedChannelizer() {
+ return server.getServerGremlinExecutor().
+ getSettings().channelizer.equals(UnifiedChannelizer.class.getName());
+ }
+
private static void clearNeo4j(Settings settings) {
deleteDirectory(new File("/tmp/neo4j"));
settings.graphs.put("graph", "conf/neo4j-empty.properties");
@@ -140,8 +165,13 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
client1.close();
cluster1.close();
- assertThat(recordingAppender.getMessages(), hasItem("INFO - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
- assertThat(recordingAppender.getMessages(), hasItem("INFO - Session shouldCloseSessionOnClientClose closed\n"));
+ // the following session close log message is no longer relevant as
+ if (isUsingUnifiedChannelizer()) {
+ assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+ } else {
+ assertThat(recordingAppender.getMessages(), hasItem("INFO - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
+ assertThat(recordingAppender.getMessages(), hasItem("INFO - Session shouldCloseSessionOnClientClose closed\n"));
+ }
// try to reconnect to that session and make sure no state is there
final Cluster clusterReconnect = TestClientFactory.open();
@@ -159,16 +189,27 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
// the commit from client1 should not have gone through so there should be no data present.
assertEquals(0, clientReconnect.submit("graph.traversal().V().count()").all().join().get(0).getInt());
clusterReconnect.close();
+
+ if (isUsingUnifiedChannelizer()) {
+ assertEquals(0, ((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().getActiveSessionCount());
+ }
}
@Test
public void shouldUseGlobalFunctionCache() throws Exception {
final Cluster cluster = TestClientFactory.open();
- final Client client = cluster.connect(name.getMethodName());
+ final Client session = cluster.connect(name.getMethodName());
+ final Client client = cluster.connect();
+
+ assertEquals(3, session.submit("def sumItUp(x,y){x+y};sumItUp(1,2)").all().get().get(0).getInt());
+ assertEquals(3, session.submit("sumItUp(1,2)").all().get().get(0).getInt());
try {
- assertEquals(3, client.submit("def addItUp(x,y){x+y};addItUp(1,2)").all().get().get(0).getInt());
- assertEquals(3, client.submit("addItUp(1,2)").all().get().get(0).getInt());
+ client.submit("sumItUp(1,2)").all().get().get(0).getInt();
+ fail("Global functions should not be cached so the call to sumItUp() should fail");
+ } catch (Exception ex) {
+ final Throwable root = ExceptionUtils.getRootCause(ex);
+ assertThat(root.getMessage(), startsWith("No signature of method"));
} finally {
cluster.close();
}
@@ -180,14 +221,14 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
final Client client = cluster.connect(name.getMethodName());
try {
- assertEquals(3, client.submit("def addItUp(x,y){x+y};addItUp(1,2)").all().get().get(0).getInt());
+ assertEquals(3, client.submit("def sumItUp(x,y){x+y};sumItUp(1,2)").all().get().get(0).getInt());
} catch (Exception ex) {
cluster.close();
throw ex;
}
try {
- client.submit("addItUp(1,2)").all().get().get(0).getInt();
+ client.submit("sumItUp(1,2)").all().get().get(0).getInt();
fail("Global functions should not be cached so the call to addItUp() should fail");
} catch (Exception ex) {
final Throwable root = ExceptionUtils.getRootCause(ex);
@@ -273,8 +314,12 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
cluster.close();
}
- assertEquals(1, recordingAppender.getMessages().stream()
- .filter(msg -> msg.equals("INFO - Session shouldCloseSessionOnceOnRequest closed\n")).count());
+ if (isUsingUnifiedChannelizer()) {
+ assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+ } else {
+ assertEquals(1, recordingAppender.getMessages().stream()
+ .filter(msg -> msg.equals("INFO - Session shouldCloseSessionOnceOnRequest closed\n")).count());
+ }
}
@Test
@@ -308,9 +353,13 @@ public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerInte
cluster.close();
}
- // there will be one for the timeout and a second for closing the cluster
- assertEquals(2, recordingAppender.getMessages().stream()
- .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
+ if (isUsingUnifiedChannelizer()) {
+ assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
+ } else {
+ // there will be one for the timeout and a second for closing the cluster
+ assertEquals(2, recordingAppender.getMessages().stream()
+ .filter(msg -> msg.equals("INFO - Session shouldHaveTheSessionTimeout closed\n")).count());
+ }
}
@Test
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
index 166764b..f28969c 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
@@ -22,6 +22,7 @@ import org.apache.http.NoHttpResponseException;
import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
import org.junit.Test;
import org.junit.Assert;
@@ -29,6 +30,9 @@ import java.util.Map;
import java.util.HashMap;
import java.net.SocketException;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assume.assumeThat;
+
public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
@Override
@@ -74,6 +78,7 @@ public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChanneliz
final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
final Map<String,Object> authConfig = new HashMap<>();
authSettings.authenticator = SimpleAuthenticator.class.getName();
+ authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
authSettings.config = authConfig;
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
similarity index 58%
copy from gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
copy to gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
index 166764b..a090a20 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizerIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/channel/UnifiedChannelizerIntegrateTest.java
@@ -18,55 +18,28 @@
*/
package org.apache.tinkerpop.gremlin.server.channel;
-import org.apache.http.NoHttpResponseException;
-import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAndHttpBasicAuthenticationHandler;
-import org.junit.Test;
-import org.junit.Assert;
-
-import java.util.Map;
import java.util.HashMap;
-import java.net.SocketException;
-
-public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
-
- @Override
- public Settings overrideSettings(final Settings settings) {
- super.overrideSettings(settings);
- final String nameOfTest = name.getMethodName();
- if (nameOfTest.equals("shouldBreakOnInvalidAuthenticationHandler") ) {
- settings.authentication = getAuthSettings();
- settings.authentication.authenticationHandler = "Foo.class";
- }
- return settings;
- }
+import java.util.Map;
- @Test
- public void shouldBreakOnInvalidAuthenticationHandler() throws Exception {
- final CombinedTestClient client = new CombinedTestClient(getProtocol());
- try {
- client.sendAndAssert("2+2", 4);
- Assert.fail("An exception should be thrown with an invalid authentication handler");
- } catch (NoHttpResponseException | SocketException e) {
- } finally {
- client.close();
- }
- }
+public class UnifiedChannelizerIntegrateTest extends AbstractGremlinServerChannelizerIntegrateTest {
@Override
public String getProtocol() {
- return HTTP;
+ return WS_AND_HTTP;
}
@Override
public String getSecureProtocol() {
- return HTTPS;
+ return WSS_AND_HTTPS;
}
@Override
public String getChannelizer() {
- return HttpChannelizer.class.getName();
+ return UnifiedChannelizer.class.getName();
}
@Override
@@ -74,6 +47,7 @@ public class HttpChannelizerIntegrateTest extends AbstractGremlinServerChanneliz
final Settings.AuthenticationSettings authSettings = new Settings.AuthenticationSettings();
final Map<String,Object> authConfig = new HashMap<>();
authSettings.authenticator = SimpleAuthenticator.class.getName();
+ authSettings.authenticationHandler = SaslAndHttpBasicAuthenticationHandler.class.getName();
authConfig.put(SimpleAuthenticator.CONFIG_CREDENTIALS_DB, "conf/tinkergraph-credentials.properties");
authSettings.config = authConfig;
diff --git a/gremlin-tools/gremlin-benchmark/pom.xml b/gremlin-tools/gremlin-benchmark/pom.xml
index 688f44f..d0cdc0e 100644
--- a/gremlin-tools/gremlin-benchmark/pom.xml
+++ b/gremlin-tools/gremlin-benchmark/pom.xml
@@ -65,11 +65,6 @@ limitations under the License.
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.tinkerpop</groupId>
- <artifactId>tinkergraph-gremlin</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index 3e50091..d04d2e1 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -143,11 +143,6 @@ limitations under the License.
<version>1.2</version>
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- <version>1.19</version>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>