+The GraphComputer
+image:graphcomputer-puffers.png[width=350,float=right] TinkerPop3 provides two primary means of interacting with a
+graph: link:[online transaction processing] (OLTP) and
+link:[online analytical processing] (OLAP). OTLP-based
+graph systems allow the user to query the graph in real-time. However, typically, real-time performance is only
+possible when a local traversal is enacted. A local traversal is one that starts at a particular vertex (or small set
+of vertices) and touches a small set of connected vertices (by any arbitrary path of arbitrary length). In short, OLTP
+queries interact with a limited set of data and respond on the order of milliseconds or seconds. On the other hand,
+with OLAP graph processing, the entire graph is processed and thus, every vertex and edge is analyzed (some times
+more than once for iterative, recursive algorithms). Due to the amount of data being processed, the results are
+typically not returned in real-time and for massive graphs (i.e. graphs represented across a cluster of machines),
+results can take on the order of minutes or hours.
+ * *OLTP*: real-time, limited data accessed, random data access, sequential processing, querying
+ * *OLAP*: long running, entire data set accessed, sequential data access, parallel processing, batch processing
+The image above demonstrates the difference between Gremlin OLTP and Gremlin OLAP. With Gremlin OLTP, the graph is
+walked by moving from vertex-to-vertex via incident edges. With Gremlin OLAP, all vertices are provided a
+`VertexProgram`. The programs send messages to one another with the topological structure of the graph acting as the
+communication network (though random message passing possible). In many respects, the messages passed are like
+the OLTP traversers moving from vertex-to-vertex. However, all messages are moving independent of one another, in
+parallel. Once a vertex program is finished computing, TinkerPop3's OLAP engine supports any number
+link:[`MapReduce`] jobs over the resultant graph.
+IMPORTANT: `GraphComputer` was designed from the start to be used within a multi-JVM, distributed environment --
+in other words, a multi-machine compute cluster. As such, all the computing objects must be able to be migrated
+between JVMs. The pattern promoted is to store state information in a `Configuration` object to later be regenerated
+by a loading process. It is important to realize that `VertexProgram`, `MapReduce`, and numerous particular instances
+rely heavily on the state of the computing classes (not the structure, but the processes) to be stored in a
+image:bsp-diagram.png[width=400,float=right] GraphComputer takes a `VertexProgram`. A VertexProgram can be thought of
+as a piece of code that is executed at each vertex in logically parallel manner until some termination condition is
+met (e.g. a number of iterations have occurred, no more data is changing in the graph, etc.). A submitted
+`VertexProgram` is copied to all the workers in the graph. A worker is not an explicit concept in the API, but is
+assumed of all `GraphComputer` implementations. At minimum each vertex is a worker (though this would be inefficient
+due to the fact that each vertex would maintain a VertexProgram). In practice, the workers partition the vertex set
+and and are responsible for the execution of the VertexProgram over all the vertices within their sphere of influence.
+The workers orchestrate the execution of the `VertexProgram.execute()` method on all their vertices in an
+link:[bulk synchronous parallel] (BSP) fashion. The vertices
+are able to communicate with one another via messages. There are two kinds of messages in Gremlin OLAP:
+`MessageScope.Local` and `MessageScope.Global`. A local message is a message to an adjacent vertex. A global
+message is a message to any arbitrary vertex in the graph. Once the VertexProgram has completed its execution,
+any number of `MapReduce` jobs are evaluated. MapReduce jobs are provided by the user via `GraphComputer.mapReduce()`
+ or by the `VertexProgram` via `VertexProgram.getMapReducers()`.
+The example below demonstrates how to submit a VertexProgram to a graph's GraphComputer. `GraphComputer.submit()`
+yields a `Future<ComputerResult>`. The `ComputerResult` has the resultant computed graph which can be a full copy
+of the original graph (see <<hadoop-gremlin,Hadoop-Gremlin>>) or a view over the original graph (see
+<<tinkergraph,TinkerGraph>>). The ComputerResult also provides access to computational side-effects called `Memory`
+(which includes, for example, runtime, number of iterations, results of MapReduce jobs, and VertexProgram-specific
+memory manipulations).
+result = graph.compute().program(
+g = result.graph().traversal(standard())
+NOTE: This model of "vertex-centric graph computing" was made popular by Google's
+link:[Pregel] graph engine.
+In the open source world, this model is found in OLAP graph computing systems such as link:[Giraph],
+link:[Hama], and link:[Faunus]. TinkerPop3 extends the
+popularized model with integrated post-processing <<mapreduce,MapReduce>> jobs over the vertex set.
+The BSP model proposed by Pregel stores the results of the computation in a distributed manner as properties on the
+elements in the graph. In many situations, it is necessary to aggregate those resultant properties into a single
+result set (i.e. a statistic). For instance, assume a VertexProgram that computes a nominal cluster for each vertex
+(i.e. link:[a graph clustering algorithm]). At the end of the
+computation, each vertex will have a property denoting the cluster it was assigned to. TinkerPop3 provides the
+ability to answer global questions about the clusters. For instance, in order to answer the following questions,
+`MapReduce` jobs are required:
+ * How many vertices are in each cluster? (*presented below*)
+ * How many unique clusters are there? (*presented below*)
+ * What is the average age of each vertex in each cluster?
+ * What is the degree distribution of the vertices in each cluster?
+A compressed representation of the `MapReduce` API in TinkerPop3 is provided below. The key idea is that the
+`map`-stage processes all vertices to emit key/value pairs. Those values are aggregated on their respective key
+for the `reduce`-stage to do its processing to ultimately yield more key/value pairs.
+public interface MapReduce<MK, MV, RK, RV, R> {
+  public void map(final Vertex vertex, final MapEmitter<MK, MV> emitter);
+  public void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter);
+  // there are more methods
+IMPORTANT: The vertex that is passed into the `` method does not contain edges. The vertex only
+contains original and computed vertex properties. This reduces the amount of data required to be loaded and ensures
+that MapReduce is used for post-processing computed results. All edge-based computing should be accomplished in the
+The `MapReduce` extension to GraphComputer is made explicit when examining the
+<<peerpressurevertexprogram,`PeerPressureVertexProgram`>> and corresponding `ClusterPopulationMapReduce`.
+In the code below, the GraphComputer result returns the computed on `Graph` as well as the `Memory` of the
+computation (`ComputerResult`). The memory maintain the results of any MapReduce jobs. The cluster population
+MapReduce result states that there are 5 vertices in cluster 1 and 1 vertex in cluster 6. This can be verified
+(in a serial manner) by looking at the `PeerPressureVertexProgram.CLUSTER` property of the resultant graph. Notice
+that the property is "hidden" unless it is directly accessed via name.
+graph = TinkerFactory.createModern()
+result = graph.compute().program(
+g = result.graph().traversal(standard())
+If there are numerous statistics desired, then its possible to register as many MapReduce jobs as needed. For
+instance, the `ClusterCountMapReduce` determines how many unique clusters were created by the peer pressure algorithm.
+Below both `ClusterCountMapReduce` and `ClusterPopulationMapReduce` are computed over the resultant graph.
+result = graph.compute().program(
+           mapReduce(
+           mapReduce(
+IMPORTANT: The MapReduce model of TinkerPop3 does not support MapReduce chaining. Thus, the order in which the
+MapReduce jobs are executed is irrelevant. This is made apparent when realizing that the `map()`-stage takes a
+`Vertex` as its input and the `reduce()`-stage yields key/value pairs. Thus, the results of reduce can not feed back
+into map.
+A Collection of VertexPrograms
+TinkerPop3 provides a collection of VertexPrograms that implement common algorithms. This section discusses the various implementations.
+IMPORTANT: The vertex programs presented are what are provided as of TinkerPop x.y.z. Over time, with future releases,
+more algorithms will be added.
+image:gremlin-pagerank.png[width=400,float=right] link:[PageRank] is perhaps the
+most popular OLAP-oriented graph algorithm. This link:[eigenvector centrality]
+variant was developed by Brin and Page of Google. PageRank defines a centrality value for all vertices in the graph,
+where centrality is defined recursively where a vertex is central if it is connected to central vertices. PageRank is
+an iterative algorithm that converges to a link:[steady state distribution]. If
+the pageRank values are normalized to 1.0, then the pageRank value of a vertex is the probability that a random walker
+will be seen that that vertex in the graph at any arbitrary moment in time. In order to help developers understand the
+methods of a `VertexProgram`, the PageRankVertexProgram code is analyzed below.
+public class PageRankVertexProgram implements VertexProgram<Double> { <1>
+    private MessageScope.Local<Double> incidentMessageScope = MessageScope.Local.of(__::outE); <2>
+    private MessageScope.Local<Double> countMessageScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.incidentMessageScope));
+    public static final String PAGE_RANK = "gremlin.pageRankVertexProgram.pageRank"; <3>
+    public static final String EDGE_COUNT = "gremlin.pageRankVertexProgram.edgeCount";
+    private static final String VERTEX_COUNT = "gremlin.pageRankVertexProgram.vertexCount";
+    private static final String ALPHA = "gremlin.pageRankVertexProgram.alpha";
+    private static final String TOTAL_ITERATIONS = "gremlin.pageRankVertexProgram.totalIterations";
+    private static final String INCIDENT_TRAVERSAL_SUPPLIER = "gremlin.pageRankVertexProgram.incidentTraversalSupplier";
+    private ConfigurationTraversal<Vertex, Edge> configurationTraversal;
+    private double vertexCountAsDouble = 1.0d;
+    private double alpha = 0.85d;
+    private int totalIterations = 30;
+    private static final Set<String> COMPUTE_KEYS = new HashSet<>(Arrays.asList(PAGE_RANK, EDGE_COUNT));
+    private PageRankVertexProgram() {}
+    @Override
+    public void loadState(final Graph graph, final Configuration configuration) { <4>
+        if (configuration.containsKey(TRAVERSAL_SUPPLIER)) {
+                    this.configurationTraversal = ConfigurationTraversal.loadState(graph, configuration, TRAVERSAL_SUPPLIER);
+                    this.incidentMessageScope = MessageScope.Local.of(this.configurationTraversal);
+                    this.countMessageScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.incidentMessageScope));
+                }
+        this.vertexCountAsDouble = configuration.getDouble(VERTEX_COUNT, 1.0d);
+        this.alpha = configuration.getDouble(ALPHA, 0.85d);
+        this.totalIterations = configuration.getInt(TOTAL_ITERATIONS, 30);
+    }
+    @Override
+    public void storeState(final Configuration configuration) {
+        configuration.setProperty(VERTEX_PROGRAM, PageRankVertexProgram.class.getName());
+        configuration.setProperty(VERTEX_COUNT, this.vertexCountAsDouble);
+        configuration.setProperty(ALPHA, this.alpha);
+        configuration.setProperty(TOTAL_ITERATIONS, this.totalIterations);
+        if (null != this.traversalSupplier) {
+            this.traversalSupplier.storeState(configuration);
+        }
+    }
+    @Override
+    public Set<String> getElementComputeKeys() { <5>
+        return COMPUTE_KEYS;
+    }
+    @Override
+    public Optional<MessageCombiner<Double>> getMessageCombiner() {
+        return (Optional) PageRankMessageCombiner.instance();
+    }
+    @Override
+    public Set<MessageScope> getMessageScopes(final int iteration) {
+        final Set<MessageScope> set = new HashSet<>();
+        set.add(0 == iteration ? this.countMessageScope : this.incidentMessageScope);
+        return set;
+    }
+    @Override
+    public void setup(final Memory memory) {
+    }
+   @Override
+    public void execute(final Vertex vertex, Messenger<Double> messenger, final Memory memory) { <6>
+        if (memory.isInitialIteration()) {  <7>
+            messenger.sendMessage(this.countMessageScope, 1.0d);
+        } else if (1 == memory.getIteration()) {  <8>
+            double initialPageRank = 1.0d / this.vertexCountAsDouble;
+            double edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0.0d, (a, b) -> a + b);
+  , initialPageRank);
+  , edgeCount);
+            messenger.sendMessage(this.incidentMessageScope, initialPageRank / edgeCount);
+        } else { <9>
+            double newPageRank = IteratorUtils.reduce(messenger.receiveMessages(), 0.0d, (a, b) -> a + b);
+            newPageRank = (this.alpha * newPageRank) + ((1.0d - this.alpha) / this.vertexCountAsDouble);
+  , newPageRank);
+            messenger.sendMessage(this.incidentMessageScope, newPageRank / vertex.<Double>value(EDGE_COUNT));
+        }
+    }
+    @Override
+    public boolean terminate(final Memory memory) { <10>
+        return memory.getIteration() >= this.totalIterations;
+    }
+    @Override
+    public String toString() {
+        return StringFactory.vertexProgramString(this, "alpha=" + this.alpha + ",iterations=" + this.totalIterations);
+    }
+<1> `PageRankVertexProgram` implements `VertexProgram<Double>` because the messages it sends are Java doubles.
+<2> The default path of energy propagation is via outgoing edges from the current vertex.
+<3> The resulting PageRank values for the vertices are stored as a hidden property.
+<4> A vertex program is constructed using an Apache `Configuration` to ensure easy dissemination across a cluster of JVMs.
+<5> A vertex program must define the "compute keys" that are the properties being operated on during the computation.
+<6> The "while"-loop of the vertex program.
+<7> In order to determine how to distribute the energy to neighbors, a "1"-count is used to determine how many incident vertices exist for the `MessageScope`.
+<8> Initially, each vertex is provided an equal amount of energy represented as a double.
+<9> Energy is aggregated, computed on according to the PageRank algorithm, and then disseminated according to the defined `MessageScope.Local`.
+<10> The computation is terminated after a pre-defined number of iterations.
+The above `PageRankVertexProgram` is used as follows.
+result = graph.compute().program(
+g = result.graph().traversal(standard())
+The `PeerPressureVertexProgram` is a clustering algorithm that assigns a nominal value to each vertex in the graph.
+The nominal value represents the vertex's cluster. If two vertices have the same nominal value, then they are in the
+same cluster. The algorithm proceeds in the following manner.
+ . Every vertex assigns itself to a unique cluster ID (initially, its vertex ID).
+ . Every vertex determines its per neighbor vote strength as 1.0d / incident edges count.
+ . Every vertex sends its cluster ID and vote strength to its adjacent vertices as a `Pair<Serializable,Double>`
+ . Every vertex generates a vote energy distribution of received cluster IDs and changes its current cluster ID to the most frequent cluster ID.
+  .. If there is a tie, then the cluster with the lowest `toString()` comparison is selected.
+ . Steps 3 and 4 repeat until either a max number of iterations has occurred or no vertex has adjusted its cluster anymore.
+The `BulkLoaderVertexProgram` provides a generalized way for loading graphs of any size (preferably large sized graphs)
+into a persistent `Graph`. The input can be any existing `Graph` database supporting TinkerPop3 or any of the Hadoop
+GraphInputFormats (e.g. `GraphSONInputFormat`, `GryoInputFormat` or `ScriptInputFormat`). The following example
+demonstrates how to load data from one TinkerGraph to another:
+writeGraphConf = new BaseConfiguration()
+writeGraphConf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph")
+writeGraphConf.setProperty("gremlin.tinkergraph.graphFormat", "gryo")
+writeGraphConf.setProperty("gremlin.tinkergraph.graphLocation", "/tmp/tinkergraph.kryo")
+modern = TinkerFactory.createModern()
+blvp =
+           keepOriginalIds(false).
+           writeGraph(writeGraphConf).create(modern)
+graph =
+g = graph.traversal()
+.Available configuration options
+|Builder Method    |Purpose | Default Value
+|`bulkLoader(Class\|String)` | Sets the class of the bulk loader implementation. | `IncrementalBulkLoader`
+|`vertexIdProperty(String)` | Sets the name of the property in the target graph that holds the vertex id from the
+source graph. | ``
+|`keepOriginalIds(boolean)` |Whether to keep the id's from the source graph in the target graph or not. It's
+recommended to keep them if it's planned to do further bulk loads using the same datasources. | `true`
+|`userSuppliedIds(boolean)` |Whether to use the id's from the source graph as id's in the target graph. If set to
+`true`, `vertexIdProperty` will be ignored. Note, that the target graph must support user supplied identifiers. | `false`
+|`intermediateBatchSize(int)` |Sets the batch size for intermediate transactions. This is per thread in a
+multi-threaded environment. +0+ means that transactions will only be committed at the end of an iteration cycle.
+It's recommended to tune this property for the target graph and not use the default value of +0+. | `0`
+|`writeGraph(String)` | Sets the path to a `GraphFactory` compatible configuration file for the target graph. | _none_
+NOTE: `BulkLoaderVertexProgram` comes with a default `BulkLoader` implementation, namely `IncrementalBulkLoader`. It
+will work for the most use-cases, but has one limitation though: It doesn't support multi-valued properties.
+`IncrementalBulkLoader` will handle every property as a single-valued property. A custom `BulkLoader` implementation
+has to be used if the default behavior is not sufficient.
+NOTE: A custom `BulkLoader` implementation for incremental loading should use `GraphTraversal` methods to create/update
+elements (e.g. `g.addV()` instead of `graph.addVertex()`). This way the `BulkLoaderVertexProgram` is able to efficiently
+track changes in the underlying graph and can apply several optimization techniques.
+image:traversal-vertex-program.png[width=250,float=left] The `TraversalVertexProgram` is a "special" VertexProgram in
+that it can be executed via a `GraphTraversal` with a `ComputerTraversalEngine`. In Gremlin, it is possible to have
+the same traversal executed using either the standard OTLP-engine or the `GraphComputer` OLAP-engine. The difference
+being where the traversal is submitted.
+NOTE: This model of graph traversal in a BSP system was first implemented by the
+link:[Faunus] graph analytics engine and originally described in
+link:[Local and Distributed Traversal Engines].
+g = graph.traversal(standard())
+g.V().both().hasLabel('person').values('age').groupCount().next() // OLTP
+g = graph.traversal(computer())
+g.V().both().hasLabel('person').values('age').groupCount().next() // OLAP
+In the OLAP example above, a `TraversalVertexProgram` is (logically) sent to each vertex in the graph. Each instance
+evaluation requires (logically) 5 BSP iterations and each iteration is interpreted as such:
+ . `g.V()`: Put a traverser on each vertex in the graph.
+ . `both()`: Propagate each traverser to the vertices `both`-adjacent to its current vertex.
+ . `hasLabel('person')`: If the vertex is not a person, kill the traversers at that vertex.
+ . `values('age')`: Have all the traversers reference the integer age of their current vertex.
+ . `groupCount()`: Count how many times a particular age has been seen.
+While 5 iterations were presented, in fact, `TraversalVertexProgram` will execute the traversal in only
+3 iterations. The reason being is that `hasLabel('person').values('age').groupCount()` can all be executed in a
+single iteration as any message sent would simply be to the current executing vertex. Thus, a simple optimization
+exists in Gremlin OLAP called "reflexive message passing" which simulates non-message-passing BSP iterations within a
+single BSP iteration.
+When the computation is complete a <<mapreduce,MapReduce>> job executes which aggregates all the `groupCount()`
+sideEffect Map (i.e. "`HashMap`") objects on each vertex into a single local representation (thus, turning the
+distributed Map representation into a local Map representation).
+The same OLAP traversal can be executed using the standard `g.compute()` model, though at the expense of verbosity.
+`TraversalVertexProgram` provides a fluent `Builder` for constructing a `TraversalVertexProgram`. The specified
+`traversal()` can be either a `Supplier<Traversal>` object, a `Supplier<Traversal>` class, or a
+link:[JSR-223] script that will generate (i.e. supply) a
+`Traversal`. If `traversal()` is supplied a single string, it is assumed that "gremlin-groovy" is the `ScriptEngine`
+to use. If two strings are supplied, then the first string denotes the `ScriptEngine` to evaluate the second string
+script with in order to generate (i.e. supply) the `Traversal`.
+//result = g.compute().program('person').values('age').groupCount('a')).create()).submit().get()
+Distributed Gremlin Gotchas
+Gremlin OLTP is not identical to Gremlin OLAP.
+IMPORTANT: There are two primary theoretical differences between Gremlin OLTP and Gremlin OLAP. First, Gremlin OLTP
+(via `Traversal`) leverages a link:[depth-first] execution engine.
+Depth-first execution has a limited memory footprint due to link:[lazy evaluation].
+On the other hand, Gremlin OLAP (via `TraversalVertexProgram`) leverages a
+link:[breadth-first] execution engine which maintains a larger memory
+footprint, but a better time complexity due to vertex-local traversers being able to be merged. The second difference
+is that Gremlin OLTP is executed in a serial fashion, while Gremlin OLAP is executed in a parallel fashion. These two
+fundamental differences lead to the behaviors enumerated below.
+. Traversal sideEffects are represented as a distributed data structure across the graph's vertex set. It is not
+possible to get a global view of a sideEffect until it is aggregated via a <<mapreduce,MapReduce>> job. In some
+situations, the local vertex representation of the sideEffect is sufficient to ensure the intended semantics of the
+traversal are respected. However, this is not generally true so be wary of traversals that require global views of a
+. When evaluating traversals that rely on path information (i.e. the history of the traversal), practical
+computational limits can easily be reached due the link:[combinatoric explosion]
+of data. With path computing enabled, every traverser is unique and thus, must be enumerated as opposed to being
+counted/merged. The difference being a collection of paths vs. a single 64-bit long at a single vertex. For more
+information on this concept, please see link:[Faunus Provides Big Graph Data].
+. When traversals of the form `'a').y.someSideEffectStep('a').z` are evaluated, the `a` object is stored in the
+path information of the traverser and thus, such traversals (may) turn on path calculations when executed on a
+. Steps that are concerned with the global ordering of traversers do not have a meaningful representation in
+OLAP. For example, what does <<order-step,`order()`>>-step mean when all traversers are being processed in parallel?
+Even if the traversers were aggregated and ordered, then at the next step they would return to being executed in
+parallel and thus, in an unpredictable order. When `order()`-like steps are executed at the end of a traversal (i.e
+the final step), the `TraverserMapReduce` job ensures the resultant serial representation is ordered accordingly.
+. Steps that are concerned with providing a global aggregate to the next step of computation do not have a correlate
+in OLAP. For example, <<fold-step,`fold()`>>-step can only fold up the objects at each executing vertex. Next, even
+if a global fold was possible, where would it go? Which vertex would be the host of the data structure? The
+`fold()`-step only makes sense as an end-step whereby a MapReduce job can generate the proper global-to-local data