You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/05/27 22:44:36 UTC
[1/2] incubator-tinkerpop git commit: Iteration of results in Gremlin
Server now occur in the same thread as eval. TINKERPOP3-704
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 6faea9038 -> 050eba77a
Iteration of results in Gremlin Server now occur in the same thread as eval. TINKERPOP3-704
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fcfc6ead
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fcfc6ead
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fcfc6ead
Branch: refs/heads/master
Commit: fcfc6ead082c97c4cad1394cedbed3a19aa558b2
Parents: 6faea90
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed May 27 16:40:50 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 27 16:41:49 2015 -0400
----------------------------------------------------------------------
.../gremlin/groovy/engine/GremlinExecutor.java | 75 ++++++++++++++++++--
.../server/op/AbstractEvalOpProcessor.java | 53 ++++++--------
2 files changed, 92 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fcfc6ead/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
index e9945c0..884eb39 100644
--- a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
+++ b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
@@ -143,33 +143,94 @@ public class GremlinExecutor implements AutoCloseable {
}
}
+ /**
+ * Evaluate a script with empty bindings.
+ */
public CompletableFuture<Object> eval(final String script) {
return eval(script, null, new SimpleBindings());
}
+ /**
+ * Evaluate a script with specified bindings.
+ */
public CompletableFuture<Object> eval(final String script, final Bindings boundVars) {
return eval(script, null, boundVars);
}
+ /**
+ * Evaluate a script with a {@link Map} of bindings.
+ */
public CompletableFuture<Object> eval(final String script, final Map<String, Object> boundVars) {
return eval(script, null, new SimpleBindings(boundVars));
}
+ /**
+ * Evaluate a script.
+ *
+ * @param script the script to evaluate
+ * @param language the language to evaluate it in
+ * @param boundVars the bindings as a {@link Map} to evaluate in the context of the script
+ */
public CompletableFuture<Object> eval(final String script, final String language, final Map<String, Object> boundVars) {
return eval(script, language, new SimpleBindings(boundVars));
}
+ /**
+ * Evaluate a script.
+ *
+ * @param script the script to evaluate
+ * @param language the language to evaluate it in
+ * @param boundVars the bindings to evaluate in the context of the script
+ */
public CompletableFuture<Object> eval(final String script, final String language, final Bindings boundVars) {
- return eval(script, language, boundVars, null);
+ return eval(script, language, boundVars, null, null);
}
+ /**
+ * Evaluate a script and allow for the submission of a transform {@link Function} that will transform the
+ * result after script evaluates but before transaction commit and before the returned {@link CompletableFuture}
+ * is completed.
+ *
+ * @param script the script to evaluate
+ * @param language the language to evaluate it in
+ * @param boundVars the bindings to evaluate in the context of the script
+ * @param transformResult a {@link Function} that transforms the result - can be {@code null}
+ */
public CompletableFuture<Object> eval(final String script, final String language, final Map<String, Object> boundVars,
final Function<Object, Object> transformResult) {
- return eval(script, language, new SimpleBindings(boundVars), transformResult);
+ return eval(script, language, new SimpleBindings(boundVars), transformResult, null);
+ }
+
+ /**
+ * Evaluate a script and allow for the submission of a {@link Consumer} that will take the result for additional
+ * processing after the script evaluates and after the {@link CompletableFuture} is completed, but before the
+ * transaction is committed.
+ *
+ * @param script the script to evaluate
+ * @param language the language to evaluate it in
+ * @param boundVars the bindings to evaluate in the context of the script
+ * @param withResult a {@link Consumer} that accepts the result - can be {@code null}
+ */
+ public CompletableFuture<Object> eval(final String script, final String language, final Map<String, Object> boundVars,
+ final Consumer<Object> withResult) {
+ return eval(script, language, new SimpleBindings(boundVars), null, withResult);
}
+ /**
+ * Evaluate a script and allow for the submission of both a transform {@link Function} and {@link Consumer}.
+ * The {@link Function} will transform the result after script evaluates but before transaction commit and before
+ * the returned {@link CompletableFuture} is completed. The {@link Consumer} will take the result for additional
+ * processing after the script evaluates and after the {@link CompletableFuture} is completed, but before the
+ * transaction is committed.
+ *
+ * @param script the script to evaluate
+ * @param language the language to evaluate it in
+ * @param boundVars the bindings to evaluate in the context of the script
+ * @param transformResult a {@link Function} that transforms the result - can be {@code null}
+ * @param withResult a {@link Consumer} that accepts the result - can be {@code null}
+ */
public CompletableFuture<Object> eval(final String script, final String language, final Bindings boundVars,
- final Function<Object, Object> transformResult) {
+ final Function<Object, Object> transformResult, final Consumer<Object> withResult) {
final String lang = Optional.ofNullable(language).orElse("gremlin-groovy");
logger.debug("Preparing to evaluate script - {} - in thread [{}]", script, Thread.currentThread().getName());
@@ -189,7 +250,13 @@ public class GremlinExecutor implements AutoCloseable {
// apply a transformation before sending back the result - useful when trying to force serialization
// in the same thread that the eval took place given ThreadLocal nature of graphs as well as some
// transactional constraints
- evaluationFuture.complete(null == transformResult ? o : transformResult.apply(o));
+ final Object result = null == transformResult ? o : transformResult.apply(o);
+ evaluationFuture.complete(result);
+
+ // a mechanism for taking the final result and doing something with it in the same thread, but
+ // AFTER the eval and transform are done and that future completed. this provides a final means
+ // for working with the result in the same thread as it was eval'd
+ if (withResult != null) withResult.accept(result);
afterSuccess.accept(bindings);
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fcfc6ead/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
index 86941ac..ece1f79 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
@@ -19,11 +19,9 @@
package org.apache.tinkerpop.gremlin.server.op;
import com.codahale.metrics.Timer;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.driver.message.ResponseStatus;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.structure.T;
@@ -47,10 +45,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import static com.codahale.metrics.MetricRegistry.name;
@@ -137,7 +133,6 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
final ChannelHandlerContext ctx = context.getChannelHandlerContext();
final RequestMessage msg = context.getRequestMessage();
final GremlinExecutor gremlinExecutor = gremlinExecutorSupplier.get();
- final ExecutorService executor = gremlinExecutor.getExecutorService();
final Map<String, Object> args = msg.getArgs();
@@ -145,45 +140,39 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
final String language = args.containsKey(Tokens.ARGS_LANGUAGE) ? (String) args.get(Tokens.ARGS_LANGUAGE) : null;
final Bindings bindings = bindingsSupplier.get();
- final CompletableFuture<Object> evalFuture = gremlinExecutor.eval(script, language, bindings);
- evalFuture.handle((v, t) -> timerContext.stop());
-
- final AtomicBoolean iterationFired = new AtomicBoolean(false);
- final CompletableFuture<Void> iterationFuture = evalFuture.thenAcceptAsync(o -> {
- iterationFired.set(true);
+ final CompletableFuture<Object> evalFuture = gremlinExecutor.eval(script, language, bindings, null, o -> {
final Iterator itty = IteratorUtils.asIterator(o);
logger.debug("Preparing to iterate results from - {} - in thread [{}]", msg, Thread.currentThread().getName());
try {
handleIterator(context, itty);
- } catch (Exception te) {
- throw new RuntimeException(te);
+ } catch (TimeoutException ex) {
+ final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg, ex.getMessage());
+ logger.warn(errorMessage);
+ ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
+ } catch (Exception ex) {
+ logger.warn(String.format("Exception processing a script on request [%s].", msg), ex);
+ ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
}
- }, executor);
-
- iterationFuture.handleAsync((o, ex) -> {
- // iteration has completed
- if (ex != null) {
- // an exception could have fired in the evalFuture - it bubbles up here so we have to
- // check for the iteration exception or something more general.
- final Throwable root = ExceptionUtils.getRootCause(ex);
- if (!iterationFired.get()) {
- // iteration wasn't executed so this must have raised during the eval process
- logger.warn(String.format("Exception processing a script on request [%s].", msg), root);
- ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION).statusMessage(root.getMessage()).create());
- return null;
- } else if (root.getClass().equals(TimeoutException.class)) {
- final String errorMessage = String.format("Response iteration and serialization exceeded the configured threshold for request [%s] - %s", msg, ex.getCause().getMessage());
+ });
+
+ evalFuture.handle((v, t) -> {
+ timerContext.stop();
+
+ if (t != null) {
+ if (t instanceof TimeoutException) {
+ final String errorMessage = String.format("Response evaluation exceeded the configured threshold for request [%s] - %s", msg, t.getMessage());
logger.warn(errorMessage);
- ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
+ ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(t.getMessage()).create());
} else {
- logger.warn(String.format("Exception processing a script on request [%s].", msg), root);
- ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(root.getMessage()).create());
+ logger.warn(String.format("Exception processing a script on request [%s].", msg), t);
+ ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION).statusMessage(t.getMessage()).create());
}
}
+
return null;
- }, executor);
+ });
}
/**
[2/2] incubator-tinkerpop git commit: Update changelog.
Posted by sp...@apache.org.
Update changelog.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/050eba77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/050eba77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/050eba77
Branch: refs/heads/master
Commit: 050eba77ac70695ec5aadda3bb6e850c8999876b
Parents: fcfc6ea
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed May 27 16:43:33 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed May 27 16:43:33 2015 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/050eba77/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 330b5f6..0c098ef 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,6 +25,7 @@ image::http://www.tinkerpop.com/docs/current/images/gremlin-hindu.png[width=225]
TinkerPop 3.0.0.GA (NOT OFFICIALLY RELEASED YET)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+* Iteration of results in Gremlin Server occur in the same thread as evaluation and prior to transaction close.
* `GraphComputerTest` extended with validation of the semantics of all `ResultGraph`/`Persist` combinations.
* GiraphGraphComputer no longer requires an extra iteration and MapReduce job to derive the full `Memory` result.
* SparkGraphComputer now supports `InputRDD` to allow vendors/users to use a `SparkContext` to generate the adjacency list representation.