You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2015/06/05 20:46:39 UTC
[09/26] incubator-tinkerpop git commit: Added ServerGremlinExecutor.
Added ServerGremlinExecutor.
Core of server side gremlin script execution.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ed8b4746
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ed8b4746
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ed8b4746
Branch: refs/heads/preprocessor
Commit: ed8b47465245f754aa9f0b03a570cbccdbdd1286
Parents: 9b4994c
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Jun 4 12:30:25 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Jun 4 12:31:41 2015 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
docs/src/gremlin-applications.asciidoc | 6 +-
.../gremlin/server/AbstractChannelizer.java | 16 +-
.../tinkerpop/gremlin/server/Channelizer.java | 11 +-
.../tinkerpop/gremlin/server/GremlinServer.java | 117 ++++---------
.../gremlin/server/channel/HttpChannelizer.java | 17 +-
.../gremlin/server/channel/NioChannelizer.java | 14 +-
.../server/channel/WebSocketChannelizer.java | 14 +-
.../server/util/ServerGremlinExecutor.java | 167 +++++++++++++++++++
.../gremlin/server/util/ThreadFactoryUtil.java | 35 ++++
10 files changed, 266 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ed8b4746/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 3620a7e..138cbd0 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ TinkerPop 3.0.0.GA (NOT OFFICIALLY RELEASED YET)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* Refactored SSL support in the Gremlin Server/Driver.
+* Factored out `ServerGremlinExecutor` which contains the core elements of server-side script execution in Gremlin Server.
* Bumped to netty 4.0.28.Final.
* Refactored the `Mutating` interface and introduce `CallbackRegistry` interface around `EventStrategy`.
* Changed `onReadWrite` and `onClose` of `AbstractTransaction` to be synchronized.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ed8b4746/docs/src/gremlin-applications.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/gremlin-applications.asciidoc b/docs/src/gremlin-applications.asciidoc
index d0e88f1..21bcf71 100644
--- a/docs/src/gremlin-applications.asciidoc
+++ b/docs/src/gremlin-applications.asciidoc
@@ -231,11 +231,11 @@ $ bin/gremlin-server.sh conf/gremlin-server-modern.yaml
[INFO] GremlinServer - Configuring Gremlin Server from conf/gremlin-server-modern.yaml
[INFO] MetricManager - Configured Metrics Slf4jReporter configured with interval=180000ms and loggerName=org.apache.tinkerpop.gremlin.server.Settings$Slf4jReporterMetrics
[INFO] Graphs - Graph [graph] was successfully configured via [conf/tinkergraph-empty.properties].
-[INFO] GremlinServer - Initialized Gremlin thread pool. Threads in pool named with pattern gremlin-*
+[INFO] ServerGremlinExecutor - Initialized Gremlin thread pool. Threads in pool named with pattern gremlin-*
[INFO] ScriptEngines - Loaded gremlin-groovy ScriptEngine
[INFO] GremlinExecutor - Initialized gremlin-groovy ScriptEngine with scripts/generate-modern.groovy
-[INFO] GremlinServer - Initialized GremlinExecutor and configured ScriptEngines.
-[INFO] GremlinServer - A GraphTraversalSource is now bound to [g] with graphtraversalsource[tinkergraph[vertices:0 edges:0], standard]
+[INFO] ServerGremlinExecutor - Initialized GremlinExecutor and configured ScriptEngines.
+[INFO] ServerGremlinExecutor - A GraphTraversalSource is now bound to [g] with graphtraversalsource[tinkergraph[vertices:0 edges:0], standard]
[INFO] GremlinServer - Executing start up LifeCycleHook
[INFO] Logger$info - Loading 'modern' graph data.
[INFO] AbstractChannelizer - Configured application/vnd.gremlin-v1.0+gryo with org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ed8b4746/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
index 1db8c81..dc7585c 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/AbstractChannelizer.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.server;
+import io.netty.channel.EventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
@@ -30,6 +31,7 @@ import org.apache.tinkerpop.gremlin.server.handler.OpSelectorHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,14 +92,12 @@ public abstract class AbstractChannelizer extends ChannelInitializer<SocketChann
}
@Override
- public void init(final Settings settings, final GremlinExecutor gremlinExecutor,
- final ExecutorService gremlinExecutorService,
- final Graphs graphs, final ScheduledExecutorService scheduledExecutorService) {
- this.settings = settings;
- this.gremlinExecutor = gremlinExecutor;
- this.graphs = graphs;
- this.gremlinExecutorService = gremlinExecutorService;
- this.scheduledExecutorService = scheduledExecutorService;
+ public void init(final ServerGremlinExecutor<EventLoopGroup> serverGremlinExecutor) {
+ this.settings = serverGremlinExecutor.getSettings();
+ this.gremlinExecutor = serverGremlinExecutor.getGremlinExecutor();
+ this.graphs = serverGremlinExecutor.getGraphs();
+ this.gremlinExecutorService = serverGremlinExecutor.getGremlinExecutorService();
+ this.scheduledExecutorService = serverGremlinExecutor.getScheduledExecutorService();
// instantiate and configure the serializers that gremlin server will use - could error out here
// and fail the server startup
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ed8b4746/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Channelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Channelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Channelizer.java
index 12103ec..fd7821e 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Channelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Channelizer.java
@@ -18,12 +18,9 @@
*/
package org.apache.tinkerpop.gremlin.server;
-import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import io.netty.channel.ChannelHandler;
-import io.netty.util.concurrent.EventExecutorGroup;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
+import io.netty.channel.EventLoopGroup;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
/**
* An interface that makes it possible to plugin different Netty pipelines to Gremlin Server, enabling the use of
@@ -38,7 +35,5 @@ public interface Channelizer extends ChannelHandler {
/**
* This method is called just after the {@code Channelizer} is initialized.
*/
- public void init(final Settings settings, final GremlinExecutor gremlinExecutor,
- final ExecutorService gremlinExecutorService,
- final Graphs graphs, final ScheduledExecutorService scheduledExecutorService);
+ public void init(final ServerGremlinExecutor<EventLoopGroup> serverGremlinExecutor);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ed8b4746/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
index 0d31d65..8b66a84 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
@@ -18,11 +18,10 @@
*/
package org.apache.tinkerpop.gremlin.server;
-import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
import org.apache.tinkerpop.gremlin.server.util.LifeCycleHook;
import org.apache.tinkerpop.gremlin.server.util.MetricManager;
-import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
+import org.apache.tinkerpop.gremlin.server.util.ThreadFactoryUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
@@ -35,23 +34,16 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
/**
* Start and stop Gremlin Server.
@@ -69,8 +61,6 @@ public class GremlinServer {
private static final Logger logger = LoggerFactory.getLogger(GremlinServer.class);
private final Settings settings;
- private Optional<Graphs> graphs = Optional.empty();
- private List<LifeCycleHook> hooks = new ArrayList<>();
private Channel ch;
private CompletableFuture<Void> serverStopped = null;
@@ -79,20 +69,42 @@ public class GremlinServer {
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final ExecutorService gremlinExecutorService;
+ private final ServerGremlinExecutor<EventLoopGroup> serverGremlinExecutor;
+ /**
+ * Construct a Gremlin Server instance from {@link Settings}.
+ */
public GremlinServer(final Settings settings) {
this.settings = settings;
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop().join(), SERVER_THREAD_PREFIX + "shutdown"));
- final BasicThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(SERVER_THREAD_PREFIX + "boss-%d").build();
+ final ThreadFactory threadFactoryBoss = ThreadFactoryUtil.create("boss-%d");
bossGroup = new NioEventLoopGroup(settings.threadPoolBoss, threadFactoryBoss);
- final BasicThreadFactory threadFactoryWorker = new BasicThreadFactory.Builder().namingPattern(SERVER_THREAD_PREFIX + "worker-%d").build();
+ final ThreadFactory threadFactoryWorker = ThreadFactoryUtil.create("worker-%d");
workerGroup = new NioEventLoopGroup(settings.threadPoolWorker, threadFactoryWorker);
- final BasicThreadFactory threadFactoryGremlin = new BasicThreadFactory.Builder().namingPattern(SERVER_THREAD_PREFIX + "exec-%d").build();
- gremlinExecutorService = Executors.newFixedThreadPool(settings.gremlinPool, threadFactoryGremlin);
+ serverGremlinExecutor = new ServerGremlinExecutor<>(settings, null, workerGroup, EventLoopGroup.class);
+ gremlinExecutorService = serverGremlinExecutor.getGremlinExecutorService();
+ }
+
+ /**
+ * Construct a Gremlin Server instance from the {@link ServerGremlinExecutor} which internally carries some
+ * pre-constructed objects used by the server as well as the {@link Settings} object itself. This constructor
+ * is useful when Gremlin Server is being used in an embedded style and there is a need to share thread pools
+ * with the hosting application.
+ */
+ public GremlinServer(final ServerGremlinExecutor<EventLoopGroup> serverGremlinExecutor) {
+ this.serverGremlinExecutor = serverGremlinExecutor;
+ this.settings = serverGremlinExecutor.getSettings();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop().join(), SERVER_THREAD_PREFIX + "shutdown"));
+
+ final ThreadFactory threadFactoryBoss = ThreadFactoryUtil.create("boss-%d");
+ bossGroup = new NioEventLoopGroup(settings.threadPoolBoss, threadFactoryBoss);
+ workerGroup = serverGremlinExecutor.getScheduledExecutorService();
+ gremlinExecutorService = serverGremlinExecutor.getGremlinExecutorService();
}
/**
@@ -115,10 +127,9 @@ public class GremlinServer {
b.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, settings.writeBufferHighWaterMark);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- // get the executor initialized and fire off any lifecycle scripts that were provided by the user.
- // hooks get initialized during GremlinExecutor initialization
- final GremlinExecutor gremlinExecutor = initializeGremlinExecutor(gremlinExecutorService, workerGroup);
- hooks.forEach(hook -> {
+ // fire off any lifecycle scripts that were provided by the user. hooks get initialized during
+ // ServerGremlinExecutor initialization
+ serverGremlinExecutor.getHooks().forEach(hook -> {
logger.info("Executing start up {}", LifeCycleHook.class.getSimpleName());
try {
hook.onStartUp(new LifeCycleHook.Context(logger));
@@ -129,7 +140,7 @@ public class GremlinServer {
});
final Channelizer channelizer = createChannelizer(settings);
- channelizer.init(settings, gremlinExecutor, gremlinExecutorService, graphs.get(), workerGroup);
+ channelizer.init(serverGremlinExecutor);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(channelizer);
@@ -176,60 +187,6 @@ public class GremlinServer {
}
}
- private GremlinExecutor initializeGremlinExecutor(final ExecutorService gremlinExecutorService,
- final ScheduledExecutorService scheduledExecutorService) {
- // initialize graphs from configuration
- if (!graphs.isPresent()) graphs = Optional.of(new Graphs(settings));
-
- logger.info("Initialized Gremlin thread pool. Threads in pool named with pattern gremlin-*");
-
- final GremlinExecutor.Builder gremlinExecutorBuilder = GremlinExecutor.build()
- .scriptEvaluationTimeout(settings.scriptEvaluationTimeout)
- .afterFailure((b, e) -> graphs.get().rollbackAll())
- .afterSuccess(b -> graphs.get().commitAll())
- .beforeEval(b -> graphs.get().rollbackAll())
- .afterTimeout(b -> graphs.get().rollbackAll())
- .enabledPlugins(new HashSet<>(settings.plugins))
- .globalBindings(graphs.get().getGraphsAsBindings())
- .promoteBindings(kv -> kv.getValue() instanceof Graph
- || kv.getValue() instanceof TraversalSource
- || kv.getValue() instanceof LifeCycleHook)
- .executorService(gremlinExecutorService)
- .scheduledExecutorService(scheduledExecutorService);
-
- settings.scriptEngines.forEach((k, v) -> {
- // make sure that server related classes are available at init
- v.imports.add(LifeCycleHook.class.getCanonicalName());
- gremlinExecutorBuilder.addEngineSettings(k, v.imports, v.staticImports, v.scripts, v.config);
- });
- final GremlinExecutor gremlinExecutor = gremlinExecutorBuilder.create();
-
- logger.info("Initialized GremlinExecutor and configured ScriptEngines.");
-
- // script engine init may have altered the graph bindings or maybe even created new ones - need to
- // re-apply those references back
- gremlinExecutor.getGlobalBindings().entrySet().stream()
- .filter(kv -> kv.getValue() instanceof Graph)
- .forEach(kv -> graphs.get().getGraphs().put(kv.getKey(), (Graph) kv.getValue()));
-
- // script engine init may have constructed the TraversalSource bindings - store them in Graphs object
- gremlinExecutor.getGlobalBindings().entrySet().stream()
- .filter(kv -> kv.getValue() instanceof TraversalSource)
- .forEach(kv -> {
- logger.info("A {} is now bound to [{}] with {}", kv.getValue().getClass().getSimpleName(), kv.getKey(), kv.getValue());
- graphs.get().getTraversalSources().put(kv.getKey(), (TraversalSource) kv.getValue());
- });
-
- // determine if the initialization scripts introduced LifeCycleHook objects - if so we need to gather them
- // up for execution
- hooks = gremlinExecutor.getGlobalBindings().entrySet().stream()
- .filter(kv -> kv.getValue() instanceof LifeCycleHook)
- .map(kv -> (LifeCycleHook) kv.getValue())
- .collect(Collectors.toList());
-
- return gremlinExecutor;
- }
-
/**
* Stop Gremlin Server and free the port binding. Note that multiple calls to this method will return the
* same instance of the {@link java.util.concurrent.CompletableFuture}.
@@ -266,11 +223,11 @@ public class GremlinServer {
// channel is shutdown as are the thread pools - time to kill graphs as nothing else should be acting on them
new Thread(() -> {
- hooks.forEach(hook -> {
+ serverGremlinExecutor.getHooks().forEach(hook -> {
logger.info("Executing shutdown {}", LifeCycleHook.class.getSimpleName());
try {
hook.onShutDown(new LifeCycleHook.Context(logger));
- } catch (UnsupportedOperationException | UndeclaredThrowableException uoe) {
+ } catch (UnsupportedOperationException | UndeclaredThrowableException uoe) {
// if the user doesn't implement onShutDown the scriptengine will throw
// this exception. it can safely be ignored.
}
@@ -288,7 +245,7 @@ public class GremlinServer {
logger.warn("Timeout waiting for bossy/worker thread pools to shutdown - continuing with shutdown process.");
}
- graphs.ifPresent(gs -> gs.getGraphs().forEach((k, v) -> {
+ serverGremlinExecutor.getGraphs().getGraphs().forEach((k, v) -> {
logger.debug("Closing Graph instance [{}]", k);
try {
v.close();
@@ -297,7 +254,7 @@ public class GremlinServer {
} finally {
logger.info("Closed Graph instance [{}]", k);
}
- }));
+ });
logger.info("Gremlin Server - shutdown complete");
serverStopped.complete(null);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ed8b4746/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
index e36345f..06c6bdf 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
@@ -18,25 +18,18 @@
*/
package org.apache.tinkerpop.gremlin.server.channel;
-import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import io.netty.channel.EventLoopGroup;
import org.apache.tinkerpop.gremlin.server.AbstractChannelizer;
-import org.apache.tinkerpop.gremlin.server.Graphs;
-import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import io.netty.util.concurrent.EventExecutorGroup;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-
/**
* Constructs a {@link org.apache.tinkerpop.gremlin.server.Channelizer} that exposes an HTTP/REST endpoint in Gremlin Server.
*
@@ -48,10 +41,8 @@ public class HttpChannelizer extends AbstractChannelizer {
private HttpGremlinEndpointHandler httpGremlinEndpointHandler;
@Override
- public void init(final Settings settings, final GremlinExecutor gremlinExecutor,
- final ExecutorService gremlinExecutorService, final Graphs graphs,
- final ScheduledExecutorService scheduledExecutorService) {
- super.init(settings, gremlinExecutor, gremlinExecutorService, graphs, scheduledExecutorService);
+ public void init(final ServerGremlinExecutor<EventLoopGroup> serverGremlinExecutor) {
+ super.init(serverGremlinExecutor);
httpGremlinEndpointHandler = new HttpGremlinEndpointHandler(serializers, gremlinExecutor);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ed8b4746/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
index 1ac8e7e..0b95e81 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
@@ -18,21 +18,17 @@
*/
package org.apache.tinkerpop.gremlin.server.channel;
-import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import io.netty.channel.EventLoopGroup;
import org.apache.tinkerpop.gremlin.server.AbstractChannelizer;
-import org.apache.tinkerpop.gremlin.server.Graphs;
-import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.handler.NioGremlinBinaryRequestDecoder;
import org.apache.tinkerpop.gremlin.server.handler.NioGremlinResponseEncoder;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-
/**
* A {@link org.apache.tinkerpop.gremlin.server.Channelizer} that exposes an NIO-based Gremlin endpoint with a custom
* protocol.
@@ -45,10 +41,8 @@ public class NioChannelizer extends AbstractChannelizer {
private NioGremlinBinaryRequestDecoder nioGremlinBinaryRequestDecoder;
@Override
- public void init(final Settings settings, final GremlinExecutor gremlinExecutor,
- final ExecutorService gremlinExecutorService, final Graphs graphs,
- final ScheduledExecutorService scheduledExecutorService) {
- super.init(settings, gremlinExecutor, gremlinExecutorService, graphs, scheduledExecutorService);
+ public void init(final ServerGremlinExecutor<EventLoopGroup> serverGremlinExecutor) {
+ super.init(serverGremlinExecutor);
nioGremlinBinaryRequestDecoder = new NioGremlinBinaryRequestDecoder(serializers);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ed8b4746/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
index 86076f0..23d7873 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/WebSocketChannelizer.java
@@ -18,10 +18,8 @@
*/
package org.apache.tinkerpop.gremlin.server.channel;
-import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import io.netty.channel.EventLoopGroup;
import org.apache.tinkerpop.gremlin.server.AbstractChannelizer;
-import org.apache.tinkerpop.gremlin.server.Graphs;
-import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.handler.WsGremlinBinaryRequestDecoder;
import org.apache.tinkerpop.gremlin.server.handler.WsGremlinResponseEncoder;
import org.apache.tinkerpop.gremlin.server.handler.WsGremlinTextRequestDecoder;
@@ -32,12 +30,10 @@ import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
+import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-
/**
* A {@link org.apache.tinkerpop.gremlin.server.Channelizer} that exposes a WebSocket-based Gremlin endpoint with a custom
* sub-protocol.
@@ -52,10 +48,8 @@ public class WebSocketChannelizer extends AbstractChannelizer {
private WsGremlinBinaryRequestDecoder wsGremlinBinaryRequestDecoder;
@Override
- public void init(final Settings settings, final GremlinExecutor gremlinExecutor,
- final ExecutorService gremlinExecutorService, final Graphs graphs,
- final ScheduledExecutorService scheduledExecutorService) {
- super.init(settings, gremlinExecutor, gremlinExecutorService, graphs, scheduledExecutorService);
+ public void init(final ServerGremlinExecutor<EventLoopGroup> serverGremlinExecutor) {
+ super.init(serverGremlinExecutor);
wsGremlinResponseEncoder = new WsGremlinResponseEncoder();
wsGremlinTextRequestDecoder = new WsGremlinTextRequestDecoder(serializers);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ed8b4746/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
new file mode 100644
index 0000000..8d82d53
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.util;
+
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.server.Graphs;
+import org.apache.tinkerpop.gremlin.server.GremlinServer;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.stream.Collectors;
+
+/**
+ * The core of script execution in Gremlin Server. Given {@link Settings} and optionally other arguments, this
+ * class will construct a {@link GremlinExecutor} to be used by Gremlin Server. Those expecting to build their
+ * own version of Gremlin Server might consider coring their implementation with this class as it provides some
+ * basic infrastructure required for most of Gremlin Server script processing features. Those embedding Gremlin
+ * Server in another application might consider using this class to initialize the {@link GremlinServer} class
+ * as it will allow sharing of thread pool resources.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public class ServerGremlinExecutor<T extends ScheduledExecutorService> {
+ private static final Logger logger = LoggerFactory.getLogger(ServerGremlinExecutor.class);
+
+ private final Graphs graphs;
+ private final Settings settings;
+ private final List<LifeCycleHook> hooks;
+
+ private final T scheduledExecutorService;
+ private final ExecutorService gremlinExecutorService;
+ private final GremlinExecutor gremlinExecutor;
+
+ /**
+ * Create a new object from {@link Settings} where thread pools are internally created. Note that the
+ * {@code scheduleExecutorServiceClass} will be created via
+ * {@link Executors#newScheduledThreadPool(int, ThreadFactory)}.
+ */
+ public ServerGremlinExecutor(final Settings settings, final Class<T> scheduleExecutorServiceClass) {
+ this(settings, null, null, scheduleExecutorServiceClass);
+ }
+
+ /**
+ * Create a new object from {@link Settings} where thread pools are internally created. Note that if the
+ * {@code scheduleExecutorServiceClass} is set to {@code null} it will be created via
+ * {@link Executors#newScheduledThreadPool(int, ThreadFactory)}. If either of the {@link ExecutorService}
+ * instances are supplied, the {@link Settings#gremlinPool} value will be ignored for the pool size.
+ */
+ public ServerGremlinExecutor(final Settings settings, final ExecutorService gremlinExecutorService,
+ final T scheduledExecutorService, final Class<T> scheduleExecutorServiceClass) {
+ this.settings = settings;
+
+ if (null == gremlinExecutorService) {
+ final ThreadFactory threadFactoryGremlin = ThreadFactoryUtil.create("exec-%d");
+ this.gremlinExecutorService = Executors.newFixedThreadPool(settings.gremlinPool, threadFactoryGremlin);
+ } else {
+ this.gremlinExecutorService = gremlinExecutorService;
+ }
+
+ if (null == scheduledExecutorService) {
+ final ThreadFactory threadFactoryGremlin = ThreadFactoryUtil.create("worker-%d");
+ this.scheduledExecutorService = scheduleExecutorServiceClass.cast(
+ Executors.newScheduledThreadPool(settings.threadPoolWorker, threadFactoryGremlin));
+ } else {
+ this.scheduledExecutorService = scheduledExecutorService;
+ }
+
+ // initialize graphs from configuration
+ graphs = new Graphs(settings);
+
+ logger.info("Initialized Gremlin thread pool. Threads in pool named with pattern gremlin-*");
+
+ final GremlinExecutor.Builder gremlinExecutorBuilder = GremlinExecutor.build()
+ .scriptEvaluationTimeout(settings.scriptEvaluationTimeout)
+ .afterFailure((b, e) -> graphs.rollbackAll())
+ .afterSuccess(b -> graphs.commitAll())
+ .beforeEval(b -> graphs.rollbackAll())
+ .afterTimeout(b -> graphs.rollbackAll())
+ .enabledPlugins(new HashSet<>(settings.plugins))
+ .globalBindings(graphs.getGraphsAsBindings())
+ .promoteBindings(kv -> kv.getValue() instanceof Graph
+ || kv.getValue() instanceof TraversalSource
+ || kv.getValue() instanceof LifeCycleHook)
+ .executorService(this.gremlinExecutorService)
+ .scheduledExecutorService(this.scheduledExecutorService);
+
+ settings.scriptEngines.forEach((k, v) -> {
+ // make sure that server related classes are available at init
+ v.imports.add(LifeCycleHook.class.getCanonicalName());
+ gremlinExecutorBuilder.addEngineSettings(k, v.imports, v.staticImports, v.scripts, v.config);
+ });
+
+ gremlinExecutor = gremlinExecutorBuilder.create();
+
+ logger.info("Initialized GremlinExecutor and configured ScriptEngines.");
+
+ // script engine init may have altered the graph bindings or maybe even created new ones - need to
+ // re-apply those references back
+ gremlinExecutor.getGlobalBindings().entrySet().stream()
+ .filter(kv -> kv.getValue() instanceof Graph)
+ .forEach(kv -> graphs.getGraphs().put(kv.getKey(), (Graph) kv.getValue()));
+
+ // script engine init may have constructed the TraversalSource bindings - store them in Graphs object
+ gremlinExecutor.getGlobalBindings().entrySet().stream()
+ .filter(kv -> kv.getValue() instanceof TraversalSource)
+ .forEach(kv -> {
+ logger.info("A {} is now bound to [{}] with {}", kv.getValue().getClass().getSimpleName(), kv.getKey(), kv.getValue());
+ graphs.getTraversalSources().put(kv.getKey(), (TraversalSource) kv.getValue());
+ });
+
+ // determine if the initialization scripts introduced LifeCycleHook objects - if so we need to gather them
+ // up for execution
+ hooks = gremlinExecutor.getGlobalBindings().entrySet().stream()
+ .filter(kv -> kv.getValue() instanceof LifeCycleHook)
+ .map(kv -> (LifeCycleHook) kv.getValue())
+ .collect(Collectors.toList());
+ }
+
+ public T getScheduledExecutorService() {
+ return scheduledExecutorService;
+ }
+
+ public GremlinExecutor getGremlinExecutor() {
+ return gremlinExecutor;
+ }
+
+ public ExecutorService getGremlinExecutorService() {
+ return gremlinExecutorService;
+ }
+
+ public Graphs getGraphs() {
+ return graphs;
+ }
+
+ public Settings getSettings() {
+ return settings;
+ }
+
+ public List<LifeCycleHook> getHooks() {
+ return hooks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ed8b4746/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ThreadFactoryUtil.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ThreadFactoryUtil.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ThreadFactoryUtil.java
new file mode 100644
index 0000000..a7af545
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ThreadFactoryUtil.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.util;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ */
+public final class ThreadFactoryUtil {
+ private static final String SERVER_THREAD_PREFIX = "gremlin-server-";
+ private ThreadFactoryUtil() {}
+
+ public static ThreadFactory create(final String pattern) {
+ return new BasicThreadFactory.Builder().namingPattern(SERVER_THREAD_PREFIX + pattern).build();
+ }
+}