You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/08/10 20:19:12 UTC
tinkerpop git commit: Make TraversalOpProcessor a bit more extensible.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1278 6efca4ad0 -> 5ff0fe7a5
Make TraversalOpProcessor a bit more extensible.
Other OpProcessors have similar ways to allow for extensibilty in similar methods to those now exposed in TraversalOpProcessor.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/5ff0fe7a
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/5ff0fe7a
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/5ff0fe7a
Branch: refs/heads/TINKERPOP-1278
Commit: 5ff0fe7a5c90b0ef447468692e6a23b403f159b3
Parents: 6efca4a
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed Aug 10 15:12:22 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed Aug 10 15:12:22 2016 -0400
----------------------------------------------------------------------
.../op/traversal/TraversalOpProcessor.java | 50 +++++++++++++-------
1 file changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5ff0fe7a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index 9d59149..9725b36 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -220,7 +220,7 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
private static Map<String, String> validateTraversalRequest(final RequestMessage message) throws OpProcessorException {
if (!message.optionalArgs(Tokens.ARGS_GREMLIN).isPresent()) {
- final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_GREMLIN);
+ final String msg = String.format("A message with [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_GREMLIN);
throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
}
@@ -232,12 +232,12 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
private static Optional<Map<String, String>> validatedAliases(RequestMessage message) throws OpProcessorException {
final Optional<Map<String, String>> aliases = message.optionalArgs(Tokens.ARGS_ALIASES);
if (!aliases.isPresent()) {
- final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
+ final String msg = String.format("A message with [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
}
if (aliases.get().size() != 1) {
- final String msg = String.format("A message with an [%s] op code requires the [%s] argument to be a Map containing one alias assignment.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
+ final String msg = String.format("A message with [%s] op code requires the [%s] argument to be a Map containing one alias assignment.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
}
return aliases;
@@ -260,11 +260,10 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
try {
final ChannelHandlerContext ctx = context.getChannelHandlerContext();
final Graph graph = g.getGraph();
- final boolean supportsTransactions = graph.features().graph().supportsTransactions();
context.getGremlinExecutor().getExecutorService().submit(() -> {
try {
- if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+ beforeProcessing(graph, context);
try {
final TraversalSideEffects sideEffects = cache.getIfPresent(sideEffect.get());
@@ -273,7 +272,7 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
final String errorMessage = String.format("Could not find side-effects for %s.", sideEffect.get());
logger.warn(errorMessage);
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
- if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+ onError(graph, context);
return;
}
@@ -282,22 +281,20 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg.getRequestId(), ex.getMessage());
logger.warn(errorMessage);
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
- if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+ onError(graph, context);
return;
} catch (Exception ex) {
logger.warn(String.format("Exception processing a side-effect on iteration for request [%s].", msg.getRequestId()), ex);
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
- if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+ onError(graph, context);
return;
}
- // there was no "writing" here, just side-effect retrieval, so if a transaction was opened then
- // just close with rollback
- if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+ onSideEffectSuccess(graph, context);
} catch (Exception ex) {
logger.warn(String.format("Exception processing a side-effect on request [%s].", msg.getRequestId()), ex);
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
- if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+ onError(graph, context);
} finally {
timerContext.stop();
}
@@ -344,11 +341,10 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
try {
final ChannelHandlerContext ctx = context.getChannelHandlerContext();
final Graph graph = g.getGraph();
- final boolean supportsTransactions = graph.features().graph().supportsTransactions();
context.getGremlinExecutor().getExecutorService().submit(() -> {
try {
- if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+ beforeProcessing(graph, context);
try {
// compile the traversal - without it getEndStep() has nothing in it
@@ -361,20 +357,20 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg.getRequestId(), ex.getMessage());
logger.warn(errorMessage);
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
- if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+ onError(graph, context);
return;
} catch (Exception ex) {
logger.warn(String.format("Exception processing a Traversal on iteration for request [%s].", msg.getRequestId()), ex);
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
- if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+ onError(graph, context);
return;
}
- if (supportsTransactions && graph.tx().isOpen()) graph.tx().commit();
+ onTraversalSuccess(graph, context);
} catch (Exception ex) {
logger.warn(String.format("Exception processing a Traversal on request [%s].", msg.getRequestId()), ex);
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
- if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+ onError(graph, context);
} finally {
timerContext.stop();
}
@@ -387,6 +383,24 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
}
}
+ protected void beforeProcessing(final Graph graph, final Context ctx) {
+ if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+ }
+
+ protected void onError(final Graph graph, final Context ctx) {
+ if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+ }
+
+ protected void onTraversalSuccess(final Graph graph, final Context ctx) {
+ if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().commit();
+ }
+
+ protected void onSideEffectSuccess(final Graph graph, final Context ctx) {
+ // there was no "writing" here, just side-effect retrieval, so if a transaction was opened then
+ // just close with rollback
+ if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+ }
+
@Override
protected Map<String, Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
final ResponseStatusCode code, final Iterator itty) {