You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/05/27 22:44:36 UTC

[1/2] incubator-tinkerpop git commit: Iteration of results in Gremlin Server now occur in the same thread as eval. TINKERPOP3-704

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 6faea9038 -> 050eba77a


Iteration of results in Gremlin Server now occur in the same thread as eval. TINKERPOP3-704


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fcfc6ead
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fcfc6ead
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fcfc6ead

Branch: refs/heads/master
Commit: fcfc6ead082c97c4cad1394cedbed3a19aa558b2
Parents: 6faea90
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed May 27 16:40:50 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 27 16:41:49 2015 -0400

----------------------------------------------------------------------
 .../gremlin/groovy/engine/GremlinExecutor.java  | 75 ++++++++++++++++++--
 .../server/op/AbstractEvalOpProcessor.java      | 53 ++++++--------
 2 files changed, 92 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fcfc6ead/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
index e9945c0..884eb39 100644
--- a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
+++ b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
@@ -143,33 +143,94 @@ public class GremlinExecutor implements AutoCloseable {
         }
     }
 
+    /**
+     * Evaluate a script with empty bindings.
+     */
     public CompletableFuture<Object> eval(final String script) {
         return eval(script, null, new SimpleBindings());
     }
 
+    /**
+     * Evaluate a script with specified bindings.
+     */
     public CompletableFuture<Object> eval(final String script, final Bindings boundVars) {
         return eval(script, null, boundVars);
     }
 
+    /**
+     * Evaluate a script with a {@link Map} of bindings.
+     */
     public CompletableFuture<Object> eval(final String script, final Map<String, Object> boundVars) {
         return eval(script, null, new SimpleBindings(boundVars));
     }
 
+    /**
+     * Evaluate a script.
+     *
+     * @param script the script to evaluate
+     * @param language the language to evaluate it in
+     * @param boundVars the bindings as a {@link Map} to evaluate in the context of the script
+     */
     public CompletableFuture<Object> eval(final String script, final String language, final Map<String, Object> boundVars) {
         return eval(script, language, new SimpleBindings(boundVars));
     }
 
+    /**
+     * Evaluate a script.
+     *
+     * @param script the script to evaluate
+     * @param language the language to evaluate it in
+     * @param boundVars the bindings to evaluate in the context of the script
+     */
     public CompletableFuture<Object> eval(final String script, final String language, final Bindings boundVars) {
-        return eval(script, language, boundVars, null);
+        return eval(script, language, boundVars, null, null);
     }
 
+    /**
+     * Evaluate a script and allow for the submission of a transform {@link Function} that will transform the
+     * result after script evaluates but before transaction commit and before the returned {@link CompletableFuture}
+     * is completed.
+     *
+     * @param script the script to evaluate
+     * @param language the language to evaluate it in
+     * @param boundVars the bindings to evaluate in the context of the script
+     * @param transformResult a {@link Function} that transforms the result - can be {@code null}
+     */
     public CompletableFuture<Object> eval(final String script, final String language, final Map<String, Object> boundVars,
                                           final Function<Object, Object> transformResult) {
-        return eval(script, language, new SimpleBindings(boundVars), transformResult);
+        return eval(script, language, new SimpleBindings(boundVars), transformResult, null);
+    }
+
+    /**
+     * Evaluate a script and allow for the submission of a {@link Consumer} that will take the result for additional
+     * processing after the script evaluates and after the {@link CompletableFuture} is completed, but before the
+     * transaction is committed.
+     *
+     * @param script the script to evaluate
+     * @param language the language to evaluate it in
+     * @param boundVars the bindings to evaluate in the context of the script
+     * @param withResult a {@link Consumer} that accepts the result - can be {@code null}
+     */
+    public CompletableFuture<Object> eval(final String script, final String language, final Map<String, Object> boundVars,
+                                          final Consumer<Object> withResult) {
+        return eval(script, language, new SimpleBindings(boundVars), null, withResult);
     }
 
+    /**
+     * Evaluate a script and allow for the submission of both a transform {@link Function} and {@link Consumer}.
+     * The {@link Function} will transform the result after script evaluates but before transaction commit and before
+     * the returned {@link CompletableFuture} is completed. The {@link Consumer} will take the result for additional
+     * processing after the script evaluates and after the {@link CompletableFuture} is completed, but before the
+     * transaction is committed.
+     *
+     * @param script the script to evaluate
+     * @param language the language to evaluate it in
+     * @param boundVars the bindings to evaluate in the context of the script
+     * @param transformResult a {@link Function} that transforms the result - can be {@code null}
+     * @param withResult a {@link Consumer} that accepts the result - can be {@code null}
+     */
     public CompletableFuture<Object> eval(final String script, final String language, final Bindings boundVars,
-                                          final Function<Object, Object> transformResult) {
+                                          final Function<Object, Object> transformResult, final Consumer<Object> withResult) {
         final String lang = Optional.ofNullable(language).orElse("gremlin-groovy");
 
         logger.debug("Preparing to evaluate script - {} - in thread [{}]", script, Thread.currentThread().getName());
@@ -189,7 +250,13 @@ public class GremlinExecutor implements AutoCloseable {
                 // apply a transformation before sending back the result - useful when trying to force serialization
                 // in the same thread that the eval took place given ThreadLocal nature of graphs as well as some
                 // transactional constraints
-                evaluationFuture.complete(null == transformResult ? o : transformResult.apply(o));
+                final Object result = null == transformResult ? o : transformResult.apply(o);
+                evaluationFuture.complete(result);
+
+                // a mechanism for taking the final result and doing something with it in the same thread, but
+                // AFTER the eval and transform are done and that future completed.  this provides a final means
+                // for working with the result in the same thread as it was eval'd
+                if (withResult != null) withResult.accept(result);
 
                 afterSuccess.accept(bindings);
             } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fcfc6ead/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
index 86941ac..ece1f79 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
@@ -19,11 +19,9 @@
 package org.apache.tinkerpop.gremlin.server.op;
 
 import com.codahale.metrics.Timer;
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.driver.message.ResponseStatus;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
 import org.apache.tinkerpop.gremlin.structure.T;
@@ -47,10 +45,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
 import static com.codahale.metrics.MetricRegistry.name;
@@ -137,7 +133,6 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
         final ChannelHandlerContext ctx = context.getChannelHandlerContext();
         final RequestMessage msg = context.getRequestMessage();
         final GremlinExecutor gremlinExecutor = gremlinExecutorSupplier.get();
-        final ExecutorService executor = gremlinExecutor.getExecutorService();
 
         final Map<String, Object> args = msg.getArgs();
 
@@ -145,45 +140,39 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
         final String language = args.containsKey(Tokens.ARGS_LANGUAGE) ? (String) args.get(Tokens.ARGS_LANGUAGE) : null;
         final Bindings bindings = bindingsSupplier.get();
 
-        final CompletableFuture<Object> evalFuture = gremlinExecutor.eval(script, language, bindings);
-        evalFuture.handle((v, t) -> timerContext.stop());
-
-        final AtomicBoolean iterationFired = new AtomicBoolean(false);
-        final CompletableFuture<Void> iterationFuture = evalFuture.thenAcceptAsync(o -> {
-            iterationFired.set(true);
+        final CompletableFuture<Object> evalFuture = gremlinExecutor.eval(script, language, bindings, null, o -> {
             final Iterator itty = IteratorUtils.asIterator(o);
 
             logger.debug("Preparing to iterate results from - {} - in thread [{}]", msg, Thread.currentThread().getName());
 
             try {
                 handleIterator(context, itty);
-            } catch (Exception te) {
-                throw new RuntimeException(te);
+            } catch (TimeoutException ex) {
+                final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg, ex.getMessage());
+                logger.warn(errorMessage);
+                ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
+            } catch (Exception ex) {
+                logger.warn(String.format("Exception processing a script on request [%s].", msg), ex);
+                ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
             }
-        }, executor);
-
-        iterationFuture.handleAsync((o, ex) -> {
-            // iteration has completed
-            if (ex != null) {
-                // an exception could have fired in the evalFuture - it bubbles up here so we have to
-                // check for the iteration exception or something more general.
-                final Throwable root = ExceptionUtils.getRootCause(ex);
-                if (!iterationFired.get()) {
-                    // iteration wasn't executed so this must have raised during the eval process
-                    logger.warn(String.format("Exception processing a script on request [%s].", msg), root);
-                    ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION).statusMessage(root.getMessage()).create());
-                    return null;
-                } else if (root.getClass().equals(TimeoutException.class)) {
-                    final String errorMessage = String.format("Response iteration and serialization exceeded the configured threshold for request [%s] - %s", msg, ex.getCause().getMessage());
+        });
+
+        evalFuture.handle((v, t) -> {
+            timerContext.stop();
+
+            if (t != null) {
+                if (t instanceof TimeoutException) {
+                    final String errorMessage = String.format("Response evaluation exceeded the configured threshold for request [%s] - %s", msg, t.getMessage());
                     logger.warn(errorMessage);
-                    ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
+                    ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(t.getMessage()).create());
                 } else {
-                    logger.warn(String.format("Exception processing a script on request [%s].", msg), root);
-                    ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(root.getMessage()).create());
+                    logger.warn(String.format("Exception processing a script on request [%s].", msg), t);
+                    ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION).statusMessage(t.getMessage()).create());
                 }
             }
+
             return null;
-        }, executor);
+        });
     }
 
     /**


[2/2] incubator-tinkerpop git commit: Update changelog.

Posted by sp...@apache.org.
Update changelog.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/050eba77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/050eba77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/050eba77

Branch: refs/heads/master
Commit: 050eba77ac70695ec5aadda3bb6e850c8999876b
Parents: fcfc6ea
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed May 27 16:43:33 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 27 16:43:33 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/050eba77/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 330b5f6..0c098ef 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,6 +25,7 @@ image::http://www.tinkerpop.com/docs/current/images/gremlin-hindu.png[width=225]
 TinkerPop 3.0.0.GA (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Iteration of results in Gremlin Server occur in the same thread as evaluation and prior to transaction close.
 * `GraphComputerTest` extended with validation of the semantics of all `ResultGraph`/`Persist` combinations.
 * GiraphGraphComputer no longer requires an extra iteration and MapReduce job to derive the full `Memory` result.
 * SparkGraphComputer now supports `InputRDD` to allow vendors/users to use a `SparkContext` to generate the adjacency list representation.