You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/08/10 20:19:12 UTC

tinkerpop git commit: Make TraversalOpProcessor a bit more extensible.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1278 6efca4ad0 -> 5ff0fe7a5


Make TraversalOpProcessor a bit more extensible.

Other OpProcessors have similar ways to allow for extensibilty in similar methods to those now exposed in TraversalOpProcessor.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/5ff0fe7a
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/5ff0fe7a
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/5ff0fe7a

Branch: refs/heads/TINKERPOP-1278
Commit: 5ff0fe7a5c90b0ef447468692e6a23b403f159b3
Parents: 6efca4a
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed Aug 10 15:12:22 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed Aug 10 15:12:22 2016 -0400

----------------------------------------------------------------------
 .../op/traversal/TraversalOpProcessor.java      | 50 +++++++++++++-------
 1 file changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5ff0fe7a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index 9d59149..9725b36 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -220,7 +220,7 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
 
     private static Map<String, String> validateTraversalRequest(final RequestMessage message) throws OpProcessorException {
         if (!message.optionalArgs(Tokens.ARGS_GREMLIN).isPresent()) {
-            final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_GREMLIN);
+            final String msg = String.format("A message with [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_GREMLIN);
             throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
         }
 
@@ -232,12 +232,12 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
     private static Optional<Map<String, String>> validatedAliases(RequestMessage message) throws OpProcessorException {
         final Optional<Map<String, String>> aliases = message.optionalArgs(Tokens.ARGS_ALIASES);
         if (!aliases.isPresent()) {
-            final String msg = String.format("A message with an [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
+            final String msg = String.format("A message with [%s] op code requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
             throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
         }
 
         if (aliases.get().size() != 1) {
-            final String msg = String.format("A message with an [%s] op code requires the [%s] argument to be a Map containing one alias assignment.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
+            final String msg = String.format("A message with [%s] op code requires the [%s] argument to be a Map containing one alias assignment.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
             throw new OpProcessorException(msg, ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
         }
         return aliases;
@@ -260,11 +260,10 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
         try {
             final ChannelHandlerContext ctx = context.getChannelHandlerContext();
             final Graph graph = g.getGraph();
-            final boolean supportsTransactions = graph.features().graph().supportsTransactions();
 
             context.getGremlinExecutor().getExecutorService().submit(() -> {
                 try {
-                    if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                    beforeProcessing(graph, context);
 
                     try {
                         final TraversalSideEffects sideEffects = cache.getIfPresent(sideEffect.get());
@@ -273,7 +272,7 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
                             final String errorMessage = String.format("Could not find side-effects for %s.", sideEffect.get());
                             logger.warn(errorMessage);
                             ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
-                            if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                            onError(graph, context);
                             return;
                         }
 
@@ -282,22 +281,20 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
                         final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg.getRequestId(), ex.getMessage());
                         logger.warn(errorMessage);
                         ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
-                        if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                        onError(graph, context);
                         return;
                     } catch (Exception ex) {
                         logger.warn(String.format("Exception processing a side-effect on iteration for request [%s].", msg.getRequestId()), ex);
                         ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
-                        if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                        onError(graph, context);
                         return;
                     }
 
-                    // there was no "writing" here, just side-effect retrieval, so if a transaction was opened then
-                    // just close with rollback
-                    if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                    onSideEffectSuccess(graph, context);
                 } catch (Exception ex) {
                     logger.warn(String.format("Exception processing a side-effect on request [%s].", msg.getRequestId()), ex);
                     ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
-                    if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+                    onError(graph, context);
                 } finally {
                     timerContext.stop();
                 }
@@ -344,11 +341,10 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
         try {
             final ChannelHandlerContext ctx = context.getChannelHandlerContext();
             final Graph graph = g.getGraph();
-            final boolean supportsTransactions = graph.features().graph().supportsTransactions();
 
             context.getGremlinExecutor().getExecutorService().submit(() -> {
                 try {
-                    if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                    beforeProcessing(graph, context);
 
                     try {
                         // compile the traversal - without it getEndStep() has nothing in it
@@ -361,20 +357,20 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
                         final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg.getRequestId(), ex.getMessage());
                         logger.warn(errorMessage);
                         ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
-                        if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                        onError(graph, context);
                         return;
                     } catch (Exception ex) {
                         logger.warn(String.format("Exception processing a Traversal on iteration for request [%s].", msg.getRequestId()), ex);
                         ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
-                        if (supportsTransactions && graph.tx().isOpen()) graph.tx().rollback();
+                        onError(graph, context);
                         return;
                     }
 
-                    if (supportsTransactions && graph.tx().isOpen()) graph.tx().commit();
+                    onTraversalSuccess(graph, context);
                 } catch (Exception ex) {
                     logger.warn(String.format("Exception processing a Traversal on request [%s].", msg.getRequestId()), ex);
                     ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
-                    if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+                    onError(graph, context);
                 } finally {
                     timerContext.stop();
                 }
@@ -387,6 +383,24 @@ public class TraversalOpProcessor extends AbstractOpProcessor {
         }
     }
 
+    protected void beforeProcessing(final Graph graph, final Context ctx) {
+        if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+    }
+
+    protected void onError(final Graph graph, final Context ctx) {
+        if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+    }
+
+    protected void onTraversalSuccess(final Graph graph, final Context ctx) {
+        if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().commit();
+    }
+
+    protected void onSideEffectSuccess(final Graph graph, final Context ctx) {
+        // there was no "writing" here, just side-effect retrieval, so if a transaction was opened then
+        // just close with rollback
+        if (graph.features().graph().supportsTransactions() && graph.tx().isOpen()) graph.tx().rollback();
+    }
+
     @Override
     protected Map<String, Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg,
                                                    final ResponseStatusCode code, final Iterator itty) {