You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:01:49 UTC

[23/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop directories to org/apache/tinkerpop

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
new file mode 100644
index 0000000..8d573ea
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import com.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
+import org.javatuples.Pair;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The Memory of a {@link GraphComputer} is a global data structure where by vertices can communicate information with one another.
+ * Moreover, it also contains global information about the state of the computation such as runtime and the current iteration.
+ * The Memory data is logically updated in parallel using associative/commutative methods which have embarrassingly parallel implementations.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Memory {
+
+    /**
+     * Whether the key exists in the memory.
+     *
+     * @param key key to search the memory for.
+     * @return whether the key exists
+     */
+    public default boolean exists(final String key) {
+        return this.keys().contains(key);
+    }
+
+    /**
+     * The set of keys currently associated with this memory.
+     *
+     * @return the memory's key set.
+     */
+    public Set<String> keys();
+
+    /**
+     * Get the value associated with the provided key.
+     *
+     * @param key the key of the value
+     * @param <R> the type of the value
+     * @return the value
+     * @throws IllegalArgumentException is thrown if the key does not exist
+     */
+    public <R> R get(final String key) throws IllegalArgumentException;
+
+    /**
+     * Set the value of the provided key. This is typically called in setup() and/or terminate() of the {@link VertexProgram}.
+     * If this is called during execute(), there is no guarantee as to the ultimately stored value as call order is indeterminate.
+     *
+     * @param key   they key to set a value for
+     * @param value the value to set for the key
+     */
+    public void set(final String key, Object value);
+
+    /**
+     * A helper method that generates a {@link Map} of the memory key/values.
+     *
+     * @return the map representation of the memory key/values
+     */
+    public default Map<String, Object> asMap() {
+        final Map<String, Object> map = keys().stream()
+                .filter(this::exists)
+                .map(key -> Pair.with(key, get(key)))
+                .collect(Collectors.toMap(kv -> kv.getValue0(), Pair::getValue1));
+        return Collections.unmodifiableMap(map);
+    }
+
+    /**
+     * Get the current iteration number.
+     *
+     * @return the current iteration
+     */
+    public int getIteration();
+
+    /**
+     * Get the amount of milliseconds the {@link GraphComputer} has been executing thus far.
+     *
+     * @return the total time in milliseconds
+     */
+    public long getRuntime();
+
+    /**
+     * Add the provided delta value to the long value currently stored at the key.
+     *
+     * @param key   the key of the long value
+     * @param delta the adjusting amount (can be negative for decrement)
+     * @return the adjusted value (according to the previous iterations current value + delta)
+     */
+    public long incr(final String key, final long delta);
+
+    /**
+     * Logically AND the provided boolean value with the boolean value currently stored at the key.
+     *
+     * @param key  the key of the boolean value
+     * @param bool the boolean to AND
+     * @return the adjusted value (according to the previous iterations current value && bool)
+     */
+    public boolean and(final String key, final boolean bool);
+
+    /**
+     * Logically OR the provided boolean value with the boolean value currently stored at the key.
+     *
+     * @param key  the key of the boolean value
+     * @param bool the boolean to OR
+     * @return the adjusted value (according to the previous iterations current value || bool)
+     */
+    public boolean or(final String key, final boolean bool);
+
+    /**
+     * A helper method that states whether the current iteration is 0.
+     *
+     * @return whether this is the first iteration
+     */
+    public default boolean isInitialIteration() {
+        return this.getIteration() == 0;
+    }
+
+    /**
+     * The Admin interface is used by the {@link GraphComputer} to update the Memory.
+     * The developer should never need to type-cast the provided Memory to Memory.Admin.
+     */
+    public interface Admin extends Memory {
+
+        public default void incrIteration() {
+            this.setIteration(this.getIteration() + 1);
+        }
+
+        public void setIteration(final int iteration);
+
+        public void setRuntime(final long runtime);
+
+        public default Memory asImmutable() {
+            return new ImmutableMemory(this);
+        }
+    }
+
+
+    public static class Exceptions {
+
+        public static IllegalArgumentException memoryKeyCanNotBeEmpty() {
+            return new IllegalArgumentException("Graph computer memory key can not be the empty string");
+        }
+
+        public static IllegalArgumentException memoryKeyCanNotBeNull() {
+            return new IllegalArgumentException("Graph computer memory key can not be null");
+        }
+
+        public static IllegalArgumentException memoryValueCanNotBeNull() {
+            return new IllegalArgumentException("Graph computer memory value can not be null");
+        }
+
+        public static IllegalStateException memoryIsCurrentlyImmutable() {
+            return new IllegalStateException("Graph computer memory is currently immutable");
+        }
+
+        public static IllegalArgumentException memoryDoesNotExist(final String key) {
+            return new IllegalArgumentException("The memory does not have a value for provided key: " + key);
+        }
+
+        public static UnsupportedOperationException dataTypeOfMemoryValueNotSupported(final Object val) {
+            return new UnsupportedOperationException(String.format("Graph computer memory value [%s] is of type %s is not supported", val, val.getClass()));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
new file mode 100644
index 0000000..c2d7535
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+/**
+ * A MessageCombiner allows two messages in route to the same vertex to be aggregated into a single message.
+ * Message combining can reduce the number of messages sent between vertices and thus, reduce network traffic.
+ * Not all messages can be combined and thus, this is an optional feature of a {@link VertexProgram}.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface MessageCombiner<M> {
+
+    /**
+     * Combine two messages and return a message containing the combination.
+     * In many instances, it is possible to simply merge the data in the second message into the first message.
+     * Such an optimization can limit the amount of object creation.
+     *
+     * @param messageA the first message
+     * @param messageB the second message
+     * @return the combination of the two messages
+     */
+    public M combine(final M messageA, final M messageB);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java
new file mode 100644
index 0000000..75e4965
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.structure.Edge;
+import com.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.Arrays;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
+/**
+ * A {@link MessageScope} represents the range of a message. A message can have multiple receivers and message scope
+ * allows the underlying {@link GraphComputer} to apply the message passing algorithm in whichever manner is most efficient.
+ * It is best to use {@link MessageScope.Local} if possible as that provides more room for optimization by vendors than {@link MessageScope.Global}.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public abstract class MessageScope {
+
+    /**
+     * A Global message is directed at an arbitrary vertex in the graph.
+     * The recipient vertex need not be adjacent to the sending vertex.
+     * This message scope should be avoided if a {@link Local} can be used.
+     */
+    public final static class Global extends MessageScope {
+
+        private static final Global INSTANCE = Global.of();
+
+        private final Iterable<Vertex> vertices;
+
+        private Global(final Iterable<Vertex> vertices) {
+            this.vertices = vertices;
+        }
+
+        public static Global of(final Iterable<Vertex> vertices) {
+            return new Global(vertices);
+        }
+
+        public static Global of(final Vertex... vertices) {
+            return new Global(Arrays.asList(vertices));
+        }
+
+        public Iterable<Vertex> vertices() {
+            return this.vertices;
+        }
+
+        public static Global instance() {
+            return INSTANCE;
+        }
+    }
+
+    /**
+     * A Local message is directed to an adjacent (or "memory adjacent") vertex.
+     * The adjacent vertex set is defined by the provided {@link Traversal} that dictates how to go from the sending vertex to the receiving vertex.
+     * This is the preferred message scope as it can potentially be optimized by the underlying {@link Messenger} implementation.
+     * The preferred optimization is to not distribute a message object to all adjacent vertices.
+     * Instead, allow the recipients to read a single message object stored at the "sending" vertex.
+     * This is possible via `Traversal.reverse()`. This optimizations greatly reduces the amount of data created in the computation.
+     *
+     * @param <M> The {@link VertexProgram} message class
+     */
+    public final static class Local<M> extends MessageScope {
+        public final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal;
+        public final BiFunction<M, Edge, M> edgeFunction;
+
+        private Local(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal) {
+            this.incidentTraversal = incidentTraversal;
+            this.edgeFunction = (final M m, final Edge e) -> m; // the default is an identity function
+        }
+
+        private Local(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal, final BiFunction<M, Edge, M> edgeFunction) {
+            this.incidentTraversal = incidentTraversal;
+            this.edgeFunction = edgeFunction;
+        }
+
+        public static <M> Local<M> of(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal) {
+            return new Local(incidentTraversal);
+        }
+
+        public static <M> Local<M> of(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal, final BiFunction<M, Edge, M> edgeFunction) {
+            return new Local<>(incidentTraversal, edgeFunction);
+        }
+
+        public BiFunction<M, Edge, M> getEdgeFunction() {
+            return this.edgeFunction;
+        }
+
+        public Supplier<? extends Traversal<Vertex, Edge>> getIncidentTraversal() {
+            return this.incidentTraversal;
+        }
+
+        /**
+         * A helper class that can be used to generate the reverse traversal of the traversal within a {@link MessageScope.Local}.
+         */
+        public static class ReverseTraversalSupplier implements Supplier<Traversal<Vertex, Edge>> {
+
+            private final MessageScope.Local<?> localMessageScope;
+
+            public ReverseTraversalSupplier(final MessageScope.Local<?> localMessageScope) {
+                this.localMessageScope = localMessageScope;
+            }
+
+            public Traversal<Vertex, Edge> get() {
+                return this.localMessageScope.getIncidentTraversal().get().asAdmin().reverse();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java
new file mode 100644
index 0000000..087b61a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+/**
+ * The {@link Messenger} serves as the routing system for messages between vertices. For distributed systems,
+ * the messenger can implement a "message passing" engine (distributed memory). For single machine systems, the
+ * messenger can implement a "state sharing" engine (shared memory). Each messenger is tied to the particular
+ * vertex distributing the message.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public interface Messenger<M> {
+
+    /**
+     * The currently executing vertex can receive the messages of the provided {@link MessageScope}.
+     *
+     * @param messageScope the message scope of the messages to receive
+     * @return the messages for that vertex
+     */
+    public Iterable<M> receiveMessages(final MessageScope messageScope);
+
+    /**
+     * The currently executing vertex can send a message with provided {@link MessageScope}.
+     *
+     * @param messageScope the message scope of the message being sent
+     * @param message      the message to send
+     */
+    public void sendMessage(final MessageScope messageScope, final M message);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
new file mode 100644
index 0000000..c207465
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.tinkerpop.gremlin.process.computer;
+
+import com.tinkerpop.gremlin.structure.Vertex;
+import org.apache.commons.configuration.Configuration;
+
+import java.lang.reflect.Constructor;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A {@link VertexProgram} represents one component of a distributed graph computation. Each vertex in the graph
+ * (logically) executes the {@link VertexProgram} instance in parallel. The collective behavior yields
+ * the computational result. In practice, a "worker" (i.e. task, thread, etc.) is responsible for executing the
+ * VertexProgram against each vertex that it has in its vertex set (a subset of the full graph vertex set).
+ * At minimum there is one "worker" for each vertex, though this is impractical in practice and {@link GraphComputer}
+ * implementations that leverage such a design are not expected to perform well due to the excess object creation.
+ * Any local state/fields in a VertexProgram is static to the vertices within the same worker set.
+ * It is not safe to assume that the VertexProgram's "worker" state will remain stable between iterations.
+ * Hence, the existence of {@link VertexProgram#workerIterationStart} and {@link VertexProgram#workerIterationEnd}.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public interface VertexProgram<M> extends Cloneable {
+
+    public static final String VERTEX_PROGRAM = "gremlin.vertexProgram";
+
+    /**
+     * When it is necessary to store the state of the VertexProgram, this method is called.
+     * This is typically required when the VertexProgram needs to be serialized to another machine.
+     * Note that what is stored is simply the instance/configuration state, not any processed data.
+     * The default implementation provided simply stores the VertexProgram class name for reflective reconstruction.
+     * It is typically a good idea to VertexProgram.super.storeState().
+     *
+     * @param configuration the configuration to store the state of the VertexProgram in.
+     */
+    public default void storeState(final Configuration configuration) {
+        configuration.setProperty(VERTEX_PROGRAM, this.getClass().getName());
+    }
+
+    /**
+     * When it is necessary to load the state of the VertexProgram, this method is called.
+     * This is typically required when the VertexProgram needs to be serialized to another machine.
+     * Note that what is loaded is simply the instance state, not any processed data.
+     * It is possible for this method to be called for each and every vertex in the graph.
+     * Thus, it is important to only store/load that state which is needed during execution in order to reduce startup time.
+     *
+     * @param configuration the configuration to load the state of the VertexProgram from.
+     */
+    public default void loadState(final Configuration configuration) {
+
+    }
+
+    /**
+     * The method is called at the beginning of the computation.
+     * The method is global to the {@link GraphComputer} and as such, is not called for each vertex.
+     * During this stage, the {@link Memory} should be initialized to to its "start state."
+     *
+     * @param memory The global memory of the GraphComputer
+     */
+    public void setup(final Memory memory);
+
+    /**
+     * This method denotes the main body of the computation and is executed on each vertex in the graph.
+     * This method is logically executed in parallel on all vertices in the graph.
+     * When the {@link Memory} is read, it is according to the aggregated state yielded in the previous iteration.
+     * When the {@link Memory} is written, the data will be aggregated at the end of the iteration for reading in the next iteration.
+     *
+     * @param vertex    the {@link Vertex} to execute the {@link VertexProgram} on
+     * @param messenger the messenger that moves data between vertices
+     * @param memory    the shared state between all vertices in the computation
+     */
+    public void execute(final Vertex vertex, final Messenger<M> messenger, final Memory memory);
+
+    /**
+     * The method is called at the end of each iteration to determine if the computation is complete.
+     * The method is global to the {@link GraphComputer} and as such, is not called for each {@link Vertex}.
+     * The {@link Memory} maintains the aggregated data from the last execute() iteration.
+     *
+     * @param memory The global memory of the {@link GraphComputer}
+     * @return whether or not to halt the computation
+     */
+    public boolean terminate(final Memory memory);
+
+    /**
+     * This method is called at the start of each iteration of each "computational chunk."
+     * The set of vertices in the graph are typically not processed with full parallelism.
+     * The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
+     * The typical use is to create static VertexProgram state that exists for the iteration of the vertex subset.
+     * The default implementation is a no-op.
+     *
+     * @param memory The memory at the start of the iteration.
+     */
+    public default void workerIterationStart(final Memory memory) {
+
+    }
+
+    /**
+     * This method is called at the end of each iteration of each "computational chunk."
+     * The set of vertices in the graph are typically not processed with full parallelism.
+     * The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
+     * The typical use is to destroy static VertexProgram state that existed during the iteration of the vertex subset.
+     * The default implementation is a no-op.
+     *
+     * @param memory The memory at the start of the iteration.
+     */
+    public default void workerIterationEnd(final Memory memory) {
+
+    }
+
+    /**
+     * The {@link com.tinkerpop.gremlin.structure.Element} properties that will be mutated during the computation.
+     * All properties in the graph are readable, but only the keys specified here are writable.
+     * The default is an empty set.
+     *
+     * @return the set of element keys that will be mutated during the vertex program's execution
+     */
+    public default Set<String> getElementComputeKeys() {
+        return Collections.emptySet();
+    }
+
+    /**
+     * The {@link Memory} keys that will be used during the computation.
+     * These are the only keys that can be read or written throughout the life of the {@link GraphComputer}.
+     * The default is an empty set.
+     *
+     * @return the set of memory keys that will be read/written
+     */
+    public default Set<String> getMemoryComputeKeys() {
+        return Collections.emptySet();
+    }
+
+    /**
+     * Combine the messages in route to a particular vertex. Useful to reduce the amount of data transmitted over the wire.
+     * For example, instead of sending two objects that will ultimately be merged at the vertex destination, merge/combine into one and send that object.
+     * If no message combiner is provider, then no messages will be combined.
+     * Furthermore, it is not guaranteed the all messages in route to the vertex will be combined and thus, combiner-state should not be used.
+     * The result of the vertex program algorithm should be the same regardless of whether message combining is executed or not.
+     *
+     * @return A optional denoting whether or not their is a message combine associated with the vertex program.
+     */
+    public default Optional<MessageCombiner<M>> getMessageCombiner() {
+        return Optional.empty();
+    }
+
+    /**
+     * This method returns all the {@link MessageScope} possibilities for a particular iteration of the vertex program.
+     * The returned messages scopes are the scopes that will be used to send messages during the stated iteration.
+     * It is not a requirement that all stated messages scopes be used, just that it is possible that they be used during the iteration.
+     *
+     * @param memory an immutable form of the {@link Memory}
+     * @return all possible message scopes during said vertex program iteration
+     */
+    public Set<MessageScope> getMessageScopes(final Memory memory);
+
+    /**
+     * The set of {@link MapReduce} jobs that are associated with the {@link VertexProgram}.
+     * This is not necessarily the exhaustive list over the life of the {@link GraphComputer}.
+     * If MapReduce jobs are declared by GraphComputer.mapReduce(), they are not contained in this set.
+     * The default is an empty set.
+     *
+     * @return the set of {@link MapReduce} jobs associated with this {@link VertexProgram}
+     */
+    public default Set<MapReduce> getMapReducers() {
+        return Collections.emptySet();
+    }
+
+    /**
+     * When multiple workers on a single machine need VertexProgram instances, it is possible to use clone.
+     * This will provide a speedier way of generating instances, over the {@link VertexProgram#storeState} and {@link VertexProgram#loadState} model.
+     * The default implementation simply returns the object as it assumes that the VertexProgram instance is a stateless singleton.
+     *
+     * @return A clone of the VertexProgram object
+     * @throws CloneNotSupportedException
+     */
+    public VertexProgram<M> clone() throws CloneNotSupportedException;
+
+    /**
+     * A helper method to construct a {@link VertexProgram} given the content of the supplied configuration.
+     * The class of the VertexProgram is read from the {@link VertexProgram#VERTEX_PROGRAM} static configuration key.
+     * Once the VertexProgram is constructed, {@link VertexProgram#loadState} method is called with the provided configuration.
+     *
+     * @param configuration A configuration with requisite information to build a vertex program
+     * @param <V>           The vertex program type
+     * @return the newly constructed vertex program
+     */
+    public static <V extends VertexProgram> V createVertexProgram(final Configuration configuration) {
+        try {
+            final Class<V> vertexProgramClass = (Class) Class.forName(configuration.getString(VERTEX_PROGRAM));
+            final Constructor<V> constructor = vertexProgramClass.getDeclaredConstructor();
+            constructor.setAccessible(true);
+            final V vertexProgram = constructor.newInstance();
+            vertexProgram.loadState(configuration);
+            return vertexProgram;
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public interface Builder {
+
+        public Builder configure(final Object... keyValues);
+
+        public <P extends VertexProgram> P create();
+
+    }
+
+    public default Features getFeatures() {
+        return new Features() {
+        };
+    }
+
+    public interface Features {
+        public default boolean requiresGlobalMessageScopes() {
+            return false;
+        }
+
+        public default boolean requiresLocalMessageScopes() {
+            return false;
+        }
+
+        public default boolean requiresVertexAddition() {
+            return false;
+        }
+
+        public default boolean requiresVertexRemoval() {
+            return false;
+        }
+
+        public default boolean requiresVertexPropertyAddition() {
+            return false;
+        }
+
+        public default boolean requiresVertexPropertyRemoval() {
+            return false;
+        }
+
+        public default boolean requiresEdgeAddition() {
+            return false;
+        }
+
+        public default boolean requiresEdgeRemoval() {
+            return false;
+        }
+
+        public default boolean requiresEdgePropertyAddition() {
+            return false;
+        }
+
+        public default boolean requiresEdgePropertyRemoval() {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
new file mode 100644
index 0000000..8d39f64
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
+
+import com.tinkerpop.gremlin.process.computer.KeyValue;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import com.tinkerpop.gremlin.structure.Property;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ClusterCountMapReduce extends StaticMapReduce<MapReduce.NullObject, Serializable, MapReduce.NullObject, Integer, Integer> {
+
+    public static final String CLUSTER_COUNT_MEMORY_KEY = "gremlin.clusterCountMapReduce.memoryKey";
+    public static final String DEFAULT_MEMORY_KEY = "clusterCount";
+
+    private String memoryKey = DEFAULT_MEMORY_KEY;
+
+
+    private ClusterCountMapReduce() {
+
+    }
+
+    private ClusterCountMapReduce(final String memoryKey) {
+        this.memoryKey = memoryKey;
+    }
+
+    @Override
+    public void storeState(final Configuration configuration) {
+        super.storeState(configuration);
+        configuration.setProperty(CLUSTER_COUNT_MEMORY_KEY, this.memoryKey);
+    }
+
+    @Override
+    public void loadState(final Configuration configuration) {
+        this.memoryKey = configuration.getString(CLUSTER_COUNT_MEMORY_KEY, DEFAULT_MEMORY_KEY);
+    }
+
+    @Override
+    public boolean doStage(final Stage stage) {
+        return true;
+    }
+
+    @Override
+    public void map(final Vertex vertex, final MapEmitter<NullObject, Serializable> emitter) {
+        final Property<Serializable> cluster = vertex.property(PeerPressureVertexProgram.CLUSTER);
+        if (cluster.isPresent()) {
+            emitter.emit(NullObject.instance(), cluster.value());
+        }
+    }
+
+    @Override
+    public void combine(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
+        this.reduce(key, values, emitter);
+    }
+
+    @Override
+    public void reduce(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
+        final Set<Serializable> set = new HashSet<>();
+        values.forEachRemaining(set::add);
+        emitter.emit(NullObject.instance(), set.size());
+
+    }
+
+    @Override
+    public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
+        return keyValues.next().getValue();
+    }
+
+    @Override
+    public String getMemoryKey() {
+        return this.memoryKey;
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.mapReduceString(this, this.memoryKey);
+    }
+
+    //////////////////////////////
+
+    public static Builder build() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private String memoryKey = DEFAULT_MEMORY_KEY;
+
+        private Builder() {
+
+        }
+
+        public Builder memoryKey(final String memoryKey) {
+            this.memoryKey = memoryKey;
+            return this;
+        }
+
+        public ClusterCountMapReduce create() {
+            return new ClusterCountMapReduce(this.memoryKey);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
new file mode 100644
index 0000000..a8a400d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
+
+import com.tinkerpop.gremlin.process.computer.KeyValue;
+import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import com.tinkerpop.gremlin.structure.Property;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ClusterPopulationMapReduce extends StaticMapReduce<Serializable, Long, Serializable, Long, Map<Serializable, Long>> {
+
+    public static final String CLUSTER_POPULATION_MEMORY_KEY = "gremlin.clusterPopulationMapReduce.memoryKey";
+    public static final String DEFAULT_MEMORY_KEY = "clusterPopulation";
+
+    private String memoryKey = DEFAULT_MEMORY_KEY;
+
+    private ClusterPopulationMapReduce() {
+    }
+
+    private ClusterPopulationMapReduce(final String memoryKey) {
+        this.memoryKey = memoryKey;
+    }
+
+    @Override
+    public void storeState(final Configuration configuration) {
+        super.storeState(configuration);
+        configuration.setProperty(CLUSTER_POPULATION_MEMORY_KEY, this.memoryKey);
+    }
+
+    @Override
+    public void loadState(final Configuration configuration) {
+        this.memoryKey = configuration.getString(CLUSTER_POPULATION_MEMORY_KEY, DEFAULT_MEMORY_KEY);
+    }
+
+    @Override
+    public boolean doStage(final Stage stage) {
+        return true;
+    }
+
+    @Override
+    public void map(final Vertex vertex, final MapEmitter<Serializable, Long> emitter) {
+        final Property<Serializable> cluster = vertex.property(PeerPressureVertexProgram.CLUSTER);
+        if (cluster.isPresent()) {
+            emitter.emit(cluster.value(), 1l);
+        }
+    }
+
+    @Override
+    public void combine(final Serializable key, final Iterator<Long> values, final ReduceEmitter<Serializable, Long> emitter) {
+        this.reduce(key, values, emitter);
+    }
+
+    @Override
+    public void reduce(final Serializable key, final Iterator<Long> values, final ReduceEmitter<Serializable, Long> emitter) {
+        long count = 0l;
+        while (values.hasNext()) {
+            count = count + values.next();
+        }
+        emitter.emit(key, count);
+    }
+
+    @Override
+    public Map<Serializable, Long> generateFinalResult(final Iterator<KeyValue<Serializable, Long>> keyValues) {
+        final Map<Serializable, Long> clusterPopulation = new HashMap<>();
+        keyValues.forEachRemaining(pair -> clusterPopulation.put(pair.getKey(), pair.getValue()));
+        return clusterPopulation;
+    }
+
+    @Override
+    public String getMemoryKey() {
+        return this.memoryKey;
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.mapReduceString(this, this.memoryKey);
+    }
+
+    //////////////////////////////
+
+    public static Builder build() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private String memoryKey = DEFAULT_MEMORY_KEY;
+
+        private Builder() {
+
+        }
+
+        public Builder memoryKey(final String memoryKey) {
+            this.memoryKey = memoryKey;
+            return this;
+        }
+
+        public ClusterPopulationMapReduce create() {
+            return new ClusterPopulationMapReduce(this.memoryKey);
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
new file mode 100644
index 0000000..98fafcc
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.process.computer.MessageScope;
+import com.tinkerpop.gremlin.process.computer.Messenger;
+import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
+import com.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
+import com.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import com.tinkerpop.gremlin.process.graph.traversal.__;
+import com.tinkerpop.gremlin.process.util.MapHelper;
+import com.tinkerpop.gremlin.structure.Edge;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.VertexProperty;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import com.tinkerpop.gremlin.util.StreamFactory;
+import org.apache.commons.configuration.Configuration;
+import org.javatuples.Pair;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializable, Double>> {
+
+    private MessageScope.Local<?> voteScope = MessageScope.Local.of(__::outE);
+    private MessageScope.Local<?> countScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.voteScope));
+    private final Set<MessageScope> VOTE_SCOPE = new HashSet<>(Collections.singletonList(this.voteScope));
+    private final Set<MessageScope> COUNT_SCOPE = new HashSet<>(Collections.singletonList(this.countScope));
+
+    public static final String CLUSTER = "gremlin.peerPressureVertexProgram.cluster";
+    public static final String VOTE_STRENGTH = "gremlin.peerPressureVertexProgram.voteStrength";
+
+    private static final String MAX_ITERATIONS = "gremlin.peerPressureVertexProgram.maxIterations";
+    private static final String DISTRIBUTE_VOTE = "gremlin.peerPressureVertexProgram.distributeVote";
+    private static final String INCIDENT_TRAVERSAL_SUPPLIER = "gremlin.peerPressureVertexProgram.incidentTraversalSupplier";
+    private static final String VOTE_TO_HALT = "gremlin.peerPressureVertexProgram.voteToHalt";
+
+    private LambdaHolder<Supplier<Traversal<Vertex, Edge>>> traversalSupplier;
+    private int maxIterations = 30;
+    private boolean distributeVote = false;
+
+    private static final Set<String> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(CLUSTER, VOTE_STRENGTH));
+    private static final Set<String> MEMORY_COMPUTE_KEYS = new HashSet<>(Collections.singletonList(VOTE_TO_HALT));
+
+    private PeerPressureVertexProgram() {
+
+    }
+
+    @Override
+    public void loadState(final Configuration configuration) {
+        this.traversalSupplier = LambdaHolder.loadState(configuration, INCIDENT_TRAVERSAL_SUPPLIER);
+        if (null != this.traversalSupplier) {
+            VertexProgramHelper.verifyReversibility(this.traversalSupplier.get().get().asAdmin());
+            this.voteScope = MessageScope.Local.of(this.traversalSupplier.get());
+            this.countScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.voteScope));
+        }
+        this.maxIterations = configuration.getInt(MAX_ITERATIONS, 30);
+        this.distributeVote = configuration.getBoolean(DISTRIBUTE_VOTE, false);
+    }
+
+    @Override
+    public void storeState(final Configuration configuration) {
+        super.storeState(configuration);
+        this.traversalSupplier.storeState(configuration);
+        configuration.setProperty(MAX_ITERATIONS, this.maxIterations);
+        configuration.setProperty(DISTRIBUTE_VOTE, this.distributeVote);
+
+    }
+
+    @Override
+    public Set<String> getElementComputeKeys() {
+        return ELEMENT_COMPUTE_KEYS;
+    }
+
+    @Override
+    public Set<String> getMemoryComputeKeys() {
+        return MEMORY_COMPUTE_KEYS;
+    }
+
+    @Override
+    public Set<MessageScope> getMessageScopes(final Memory memory) {
+        return this.distributeVote && memory.isInitialIteration() ? COUNT_SCOPE : VOTE_SCOPE;
+    }
+
+    @Override
+    public void setup(final Memory memory) {
+        memory.set(VOTE_TO_HALT, false);
+    }
+
+    @Override
+    public void execute(final Vertex vertex, Messenger<Pair<Serializable, Double>> messenger, final Memory memory) {
+        if (memory.isInitialIteration()) {
+            if (this.distributeVote) {
+                messenger.sendMessage(this.countScope, Pair.with('c', 1.0d));
+            } else {
+                double voteStrength = 1.0d;
+                vertex.property(VertexProperty.Cardinality.single, CLUSTER, vertex.id());
+                vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
+                messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
+                memory.and(VOTE_TO_HALT, false);
+            }
+        } else if (1 == memory.getIteration() && this.distributeVote) {
+            double voteStrength = 1.0d / StreamFactory.stream(messenger.receiveMessages(this.countScope)).map(Pair::getValue1).reduce(0.0d, (a, b) -> a + b);
+            vertex.property(VertexProperty.Cardinality.single, CLUSTER, vertex.id());
+            vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
+            messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
+            memory.and(VOTE_TO_HALT, false);
+        } else {
+            final Map<Serializable, Double> votes = new HashMap<>();
+            votes.put(vertex.value(CLUSTER), vertex.<Double>value(VOTE_STRENGTH));
+            messenger.receiveMessages(this.voteScope).forEach(message -> MapHelper.incr(votes, message.getValue0(), message.getValue1()));
+            Serializable cluster = PeerPressureVertexProgram.largestCount(votes);
+            if (null == cluster) cluster = (Serializable) vertex.id();
+            memory.and(VOTE_TO_HALT, vertex.value(CLUSTER).equals(cluster));
+            vertex.property(VertexProperty.Cardinality.single, CLUSTER, cluster);
+            messenger.sendMessage(this.voteScope, new Pair<>(cluster, vertex.<Double>value(VOTE_STRENGTH)));
+        }
+    }
+
+    @Override
+    public boolean terminate(final Memory memory) {
+        final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT) || memory.getIteration() >= (this.distributeVote ? this.maxIterations + 1 : this.maxIterations);
+        if (voteToHalt) {
+            return true;
+        } else {
+            memory.or(VOTE_TO_HALT, true);
+            return false;
+        }
+    }
+
+    private static <T> T largestCount(final Map<T, Double> map) {
+        T largestKey = null;
+        double largestValue = Double.MIN_VALUE;
+        for (Map.Entry<T, Double> entry : map.entrySet()) {
+            if (entry.getValue() == largestValue) {
+                if (null != largestKey && largestKey.toString().compareTo(entry.getKey().toString()) > 0) {
+                    largestKey = entry.getKey();
+                    largestValue = entry.getValue();
+                }
+            } else if (entry.getValue() > largestValue) {
+                largestKey = entry.getKey();
+                largestValue = entry.getValue();
+            }
+        }
+        return largestKey;
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.vertexProgramString(this, "distributeVote=" + this.distributeVote + ",maxIterations=" + this.maxIterations);
+    }
+
+    //////////////////////////////
+
+    public static Builder build() {
+        return new Builder();
+    }
+
+    public static class Builder extends AbstractVertexProgramBuilder<Builder> {
+
+
+        private Builder() {
+            super(PeerPressureVertexProgram.class);
+        }
+
+        public Builder maxIterations(final int iterations) {
+            this.configuration.setProperty(MAX_ITERATIONS, iterations);
+            return this;
+        }
+
+        public Builder distributeVote(final boolean distributeVote) {
+            this.configuration.setProperty(DISTRIBUTE_VOTE, distributeVote);
+            return this;
+        }
+
+        public Builder incident(final String scriptEngine, final String traversalScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, INCIDENT_TRAVERSAL_SUPPLIER, new String[]{scriptEngine, traversalScript});
+            return this;
+        }
+
+        public Builder incident(final String traversalScript) {
+            return incident(GREMLIN_GROOVY, traversalScript);
+        }
+
+        public Builder incident(final Supplier<Traversal<Vertex, Edge>> traversal) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, INCIDENT_TRAVERSAL_SUPPLIER, traversal);
+            return this;
+        }
+
+        public Builder incident(final Class<Supplier<Traversal<Vertex, Edge>>> traversalClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, INCIDENT_TRAVERSAL_SUPPLIER, traversalClass);
+            return this;
+        }
+
+    }
+
+    ////////////////////////////
+
+    @Override
+    public Features getFeatures() {
+        return new Features() {
+            @Override
+            public boolean requiresLocalMessageScopes() {
+                return true;
+            }
+
+            @Override
+            public boolean requiresVertexPropertyAddition() {
+                return true;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
new file mode 100644
index 0000000..13caa2f
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.lambda;
+
+import com.tinkerpop.gremlin.process.computer.KeyValue;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
+import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import com.tinkerpop.gremlin.util.function.TriConsumer;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class LambdaMapReduce<MK, MV, RK, RV, R> extends StaticMapReduce<MK, MV, RK, RV, R> {
+
+    public static final String MAP_LAMBDA = "gremlin.lambdaMapReduce.mapLambda";
+    public static final String MAP_KEY_SORT = "gremlin.lambdaMapReduce.mapKeySort";
+    public static final String COMBINE_LAMBDA = "gremlin.lambdaMapReduce.combineLambda";
+    public static final String REDUCE_LAMBDA = "gremlin.lambdaMapReduce.reduceLambda";
+    public static final String REDUCE_KEY_SORT = "gremlin.lambdaMapReduce.reduceKeySort";
+    public static final String MEMORY_LAMBDA = "gremlin.lambdaMapReduce.memoryLambda";
+    public static final String MEMORY_KEY = "gremlin.lambdaMapReduce.memoryKey";
+
+    private LambdaHolder<BiConsumer<Vertex, MapEmitter<MK, MV>>> mapLambdaHolder;
+    private LambdaHolder<Supplier<Comparator<MK>>> mapKeySortLambdaHolder;
+    private LambdaHolder<TriConsumer<MK, Iterator<MV>, ReduceEmitter<RK, RV>>> combineLambdaHolder;
+    private LambdaHolder<TriConsumer<MK, Iterator<MV>, ReduceEmitter<RK, RV>>> reduceLambdaHolder;
+    private LambdaHolder<Supplier<Comparator<RK>>> reduceKeySortLambdaHolder;
+    private LambdaHolder<Function<Iterator<KeyValue<RK, RV>>, R>> memoryLambdaHolder;
+    private String memoryKey;
+
+    private LambdaMapReduce() {
+
+    }
+
+    @Override
+    public void loadState(final Configuration configuration) {
+        this.mapLambdaHolder = LambdaHolder.loadState(configuration, MAP_LAMBDA);
+        this.mapKeySortLambdaHolder = LambdaHolder.loadState(configuration, MAP_KEY_SORT);
+        this.combineLambdaHolder = LambdaHolder.loadState(configuration, COMBINE_LAMBDA);
+        this.reduceLambdaHolder = LambdaHolder.loadState(configuration, REDUCE_LAMBDA);
+        this.reduceKeySortLambdaHolder = LambdaHolder.loadState(configuration, REDUCE_KEY_SORT);
+        this.memoryLambdaHolder = LambdaHolder.loadState(configuration, MEMORY_LAMBDA);
+        this.memoryKey = configuration.getString(MEMORY_KEY, null);
+    }
+
+    @Override
+    public void storeState(final Configuration configuration) {
+        super.storeState(configuration);
+        if (null != this.mapLambdaHolder)
+            this.mapLambdaHolder.storeState(configuration);
+        if (null != this.mapKeySortLambdaHolder)
+            this.mapKeySortLambdaHolder.storeState(configuration);
+        if (null != this.combineLambdaHolder)
+            this.combineLambdaHolder.storeState(configuration);
+        if (null != this.reduceLambdaHolder)
+            this.reduceLambdaHolder.storeState(configuration);
+        if (null != this.reduceKeySortLambdaHolder)
+            this.reduceKeySortLambdaHolder.storeState(configuration);
+        if (null != this.memoryLambdaHolder)
+            this.memoryLambdaHolder.storeState(configuration);
+        configuration.setProperty(MEMORY_KEY, this.memoryKey);
+    }
+
+    @Override
+    public boolean doStage(final Stage stage) {
+        if (stage.equals(Stage.MAP))
+            return null != this.mapLambdaHolder;
+        else if (stage.equals(Stage.COMBINE))
+            return null != this.combineLambdaHolder;
+        else
+            return null != this.reduceLambdaHolder;
+    }
+
+    @Override
+    public void map(final Vertex vertex, final MapEmitter<MK, MV> emitter) {
+        this.mapLambdaHolder.get().accept(vertex, emitter);
+    }
+
+    @Override
+    public void combine(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
+        this.combineLambdaHolder.get().accept(key, values, emitter);
+    }
+
+    @Override
+    public void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
+        this.reduceLambdaHolder.get().accept(key, values, emitter);
+    }
+
+    @Override
+    public Optional<Comparator<MK>> getMapKeySort() {
+        return null == this.mapKeySortLambdaHolder ? Optional.empty() : Optional.of(this.mapKeySortLambdaHolder.get().get());
+    }
+
+    @Override
+    public Optional<Comparator<RK>> getReduceKeySort() {
+        return null == this.reduceKeySortLambdaHolder ? Optional.empty() : Optional.of(this.reduceKeySortLambdaHolder.get().get());
+    }
+
+    @Override
+    public R generateFinalResult(final Iterator<KeyValue<RK, RV>> keyValues) {
+        return null == this.memoryLambdaHolder ? (R) keyValues : this.memoryLambdaHolder.get().apply(keyValues);
+    }
+
+    @Override
+    public String getMemoryKey() {
+        return this.memoryKey;
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.mapReduceString(this, this.memoryKey);
+    }
+
+    //////////////////
+
+    public static <MK, MV, RK, RV, R> Builder<MK, MV, RK, RV, R> build() {
+        return new Builder<>();
+    }
+
+    public static class Builder<MK, MV, RK, RV, R> {
+
+        private final Configuration configuration = new BaseConfiguration();
+
+        public Builder<MK, MV, RK, RV, R> map(final BiConsumer<Vertex, MapReduce.MapEmitter<MK, MV>> mapLambda) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MAP_LAMBDA, mapLambda);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> map(final Class<? extends BiConsumer<Vertex, MapReduce.MapEmitter<MK, MV>>> mapClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_LAMBDA, mapClass);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> map(final String scriptEngine, final String mapScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_LAMBDA, new String[]{scriptEngine, mapScript});
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> map(final String setupScript) {
+            return map(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+        }
+
+        //
+
+        public Builder<MK, MV, RK, RV, R> mapKeySort(final Comparator<MK> comparator) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MAP_KEY_SORT, comparator);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> mapKeySort(final Class<? extends Comparator<MK>> comparatorClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, MAP_KEY_SORT, comparatorClass);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> mapKeySort(final String scriptEngine, final String reduceScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_KEY_SORT, new String[]{scriptEngine, reduceScript});
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> mapKeySort(final String setupScript) {
+            return mapKeySort(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+        }
+
+        ////////////
+
+        public Builder<MK, MV, RK, RV, R> combine(TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>> combineLambda) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, COMBINE_LAMBDA, combineLambda);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> combine(final Class<? extends TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>>> combineClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, COMBINE_LAMBDA, combineClass);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> combine(final String scriptEngine, final String combineScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, COMBINE_LAMBDA, new String[]{scriptEngine, combineScript});
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> combine(final String setupScript) {
+            return combine(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+        }
+
+        ////////////
+
+        public Builder<MK, MV, RK, RV, R> reduce(TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>> reduceLambda) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, REDUCE_LAMBDA, reduceLambda);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> reduce(Class<? extends TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>>> reduceClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, REDUCE_LAMBDA, reduceClass);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> reduce(final String scriptEngine, final String reduceScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, REDUCE_LAMBDA, new String[]{scriptEngine, reduceScript});
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> reduce(final String setupScript) {
+            return reduce(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+        }
+
+        //
+
+        public Builder<MK, MV, RK, RV, R> reduceKeySort(final Supplier<Comparator<RK>> comparator) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, REDUCE_KEY_SORT, comparator);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> reduceKeySort(final Class<? extends Supplier<Comparator<RK>>> comparatorClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, REDUCE_KEY_SORT, comparatorClass);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> reduceKeySort(final String scriptEngine, final String reduceScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, REDUCE_KEY_SORT, new String[]{scriptEngine, reduceScript});
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> reduceKeySort(final String setupScript) {
+            return reduceKeySort(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+        }
+
+        ////////////
+
+        public Builder<MK, MV, RK, RV, R> memory(Function<Iterator<KeyValue<RK, RV>>, R> memoryLambda) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MEMORY_LAMBDA, memoryLambda);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> memory(Class<? extends Function<Iterator<KeyValue<RK, RV>>, R>> memoryClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, MEMORY_LAMBDA, memoryClass);
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> memory(final String scriptEngine, final String memoryScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MEMORY_LAMBDA, new String[]{scriptEngine, memoryScript});
+            return this;
+        }
+
+        public Builder<MK, MV, RK, RV, R> memory(final String setupScript) {
+            return memory(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+        }
+
+        ////////////
+
+        public Builder<MK, MV, RK, RV, R> memoryKey(final String memoryKey) {
+            this.configuration.setProperty(LambdaMapReduce.MEMORY_KEY, memoryKey);
+            return this;
+        }
+
+        public LambdaMapReduce<MK, MV, RK, RV, R> create() {
+            LambdaMapReduce<MK, MV, RK, RV, R> lambdaMapReduce = new LambdaMapReduce<>();
+            lambdaMapReduce.loadState(this.configuration);
+            return lambdaMapReduce;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaVertexProgram.java
new file mode 100644
index 0000000..45bd9b9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaVertexProgram.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.lambda;
+
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.process.computer.MessageScope;
+import com.tinkerpop.gremlin.process.computer.Messenger;
+import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
+import com.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
+import com.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import com.tinkerpop.gremlin.util.function.TriConsumer;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class LambdaVertexProgram<M extends Serializable> extends StaticVertexProgram<M> {
+
+    private static final Set<MessageScope> MESSAGE_SCOPES = new HashSet<>(Collections.singletonList(MessageScope.Global.instance()));
+
+    private static final String SETUP_LAMBDA = "gremlin.lambdaVertexProgram.setupLambda";
+    private static final String EXECUTE_LAMBDA = "gremlin.lambdaVertexProgram.executeLambda";
+    private static final String TERMINATE_LAMBDA = "gremlin.lambdaVertexProgram.terminateLambda";
+    private static final String ELEMENT_COMPUTE_KEYS = "gremlin.lambdaVertexProgram.elementComputeKeys";
+    private static final String MEMORY_COMPUTE_KEYS = "gremlin.lambdaVertexProgram.memoryComputeKeys";
+
+    private LambdaHolder<Consumer<Memory>> setupLambdaHolder;
+    private Consumer<Memory> setupLambda;
+    private LambdaHolder<TriConsumer<Vertex, Messenger<M>, Memory>> executeLambdaHolder;
+    private TriConsumer<Vertex, Messenger<M>, Memory> executeLambda;
+    private LambdaHolder<Predicate<Memory>> terminateLambdaHolder;
+    private Predicate<Memory> terminateLambda;
+    private Set<String> elementComputeKeys;
+    private Set<String> memoryComputeKeys;
+
+    private LambdaVertexProgram() {
+    }
+
+    @Override
+    public void loadState(final Configuration configuration) {
+        this.setupLambdaHolder = LambdaHolder.loadState(configuration, SETUP_LAMBDA);
+        this.executeLambdaHolder = LambdaHolder.loadState(configuration, EXECUTE_LAMBDA);
+        this.terminateLambdaHolder = LambdaHolder.loadState(configuration, TERMINATE_LAMBDA);
+        this.setupLambda = null == this.setupLambdaHolder ? s -> {
+        } : this.setupLambdaHolder.get();
+        this.executeLambda = null == this.executeLambdaHolder ? (v, m, s) -> {
+        } : this.executeLambdaHolder.get();
+        this.terminateLambda = null == this.terminateLambdaHolder ? s -> true : this.terminateLambdaHolder.get();
+
+        try {
+            this.elementComputeKeys = configuration.containsKey(ELEMENT_COMPUTE_KEYS) ?
+                    VertexProgramHelper.deserialize(configuration, ELEMENT_COMPUTE_KEYS) : Collections.emptySet();
+            this.memoryComputeKeys = configuration.containsKey(MEMORY_COMPUTE_KEYS) ?
+                    VertexProgramHelper.deserialize(configuration, MEMORY_COMPUTE_KEYS) : Collections.emptySet();
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void storeState(final Configuration configuration) {
+        super.storeState(configuration);
+        if (null != this.setupLambdaHolder)
+            this.setupLambdaHolder.storeState(configuration);
+        if (null != this.executeLambdaHolder)
+            this.executeLambdaHolder.storeState(configuration);
+        if (null != this.terminateLambdaHolder)
+            this.terminateLambdaHolder.storeState(configuration);
+
+        try {
+            VertexProgramHelper.serialize(this.elementComputeKeys, configuration, ELEMENT_COMPUTE_KEYS);
+            VertexProgramHelper.serialize(this.memoryComputeKeys, configuration, MEMORY_COMPUTE_KEYS);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void setup(final Memory memory) {
+        this.setupLambda.accept(memory);
+    }
+
+
+    @Override
+    public void execute(final Vertex vertex, final Messenger<M> messenger, final Memory memory) {
+        this.executeLambda.accept(vertex, messenger, memory);
+    }
+
+    @Override
+    public boolean terminate(final Memory memory) {
+        return this.terminateLambda.test(memory);
+    }
+
+    @Override
+    public Set<String> getElementComputeKeys() {
+        return this.elementComputeKeys;
+    }
+
+    @Override
+    public Set<String> getMemoryComputeKeys() {
+        return this.memoryComputeKeys;
+    }
+
+    @Override
+    public Set<MessageScope> getMessageScopes(final Memory memory) {
+        return MESSAGE_SCOPES;
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.vertexProgramString(this);
+    }
+
+    //////////////////////////////
+
+    public static Builder build() {
+        return new Builder();
+    }
+
+    public static class Builder extends AbstractVertexProgramBuilder<Builder> {
+
+
+        private Builder() {
+            super(LambdaVertexProgram.class);
+        }
+
+        public Builder setup(final Consumer<Memory> setupLambda) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, SETUP_LAMBDA, setupLambda);
+            return this;
+        }
+
+        public Builder setup(final Class<? extends Consumer<Memory>> setupClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, SETUP_LAMBDA, setupClass);
+            return this;
+        }
+
+        public Builder setup(final String scriptEngine, final String setupScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, SETUP_LAMBDA, new String[]{scriptEngine, setupScript});
+            return this;
+        }
+
+        public Builder setup(final String setupScript) {
+            return setup(GREMLIN_GROOVY, setupScript);
+        }
+
+        ///////
+
+        public Builder execute(final TriConsumer<Vertex, Messenger, Memory> executeLambda) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, EXECUTE_LAMBDA, executeLambda);
+            return this;
+        }
+
+        public Builder execute(final Class<? extends TriConsumer<Vertex, Messenger, Memory>> executeClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, EXECUTE_LAMBDA, executeClass);
+            return this;
+        }
+
+        public Builder execute(final String scriptEngine, final String executeScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, EXECUTE_LAMBDA, new String[]{scriptEngine, executeScript});
+            return this;
+        }
+
+        public Builder execute(final String setupScript) {
+            return execute(GREMLIN_GROOVY, setupScript);
+        }
+
+        ///////
+
+        public Builder terminate(final Predicate<Memory> terminateLambda) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, TERMINATE_LAMBDA, terminateLambda);
+            return this;
+        }
+
+        public Builder terminate(final Class<? extends Predicate<Memory>> terminateClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, TERMINATE_LAMBDA, terminateClass);
+            return this;
+        }
+
+        public Builder terminate(final String scriptEngine, final String terminateScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, TERMINATE_LAMBDA, new String[]{scriptEngine, terminateScript});
+            return this;
+        }
+
+        public Builder terminate(final String setupScript) {
+            return terminate(GREMLIN_GROOVY, setupScript);
+        }
+
+        ///////
+
+        public Builder memoryComputeKeys(final Set<String> memoryComputeKeys) {
+            try {
+                VertexProgramHelper.serialize(memoryComputeKeys, configuration, MEMORY_COMPUTE_KEYS);
+                return this;
+            } catch (Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        public Builder elementComputeKeys(final Set<String> elementComputeKeys) {
+            try {
+                VertexProgramHelper.serialize(elementComputeKeys, configuration, ELEMENT_COMPUTE_KEYS);
+                return this;
+            } catch (Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        public Builder memoryComputeKeys(final String... memoryComputeKeys) {
+            return this.memoryComputeKeys(new HashSet<>(Arrays.asList(memoryComputeKeys)));
+        }
+
+        public Builder elementComputeKeys(final String... elementComputeKeys) {
+            return this.elementComputeKeys(new HashSet<>(Arrays.asList(elementComputeKeys)));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMapReduce.java
new file mode 100644
index 0000000..c29bdb8
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMapReduce.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.ranking.pagerank;
+
+import com.tinkerpop.gremlin.process.computer.KeyValue;
+import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import com.tinkerpop.gremlin.structure.Property;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PageRankMapReduce extends StaticMapReduce<Object, Double, Object, Double, Iterator<KeyValue<Object, Double>>> {
+
+    public static final String PAGE_RANK_MEMORY_KEY = "gremlin.pageRankMapReduce.memoryKey";
+    public static final String DEFAULT_MEMORY_KEY = "pageRank";
+
+    private String memoryKey = DEFAULT_MEMORY_KEY;
+
+    private PageRankMapReduce() {
+
+    }
+
+    private PageRankMapReduce(final String memoryKey) {
+        this.memoryKey = memoryKey;
+    }
+
+    @Override
+    public void storeState(final Configuration configuration) {
+        super.storeState(configuration);
+        configuration.setProperty(PAGE_RANK_MEMORY_KEY, this.memoryKey);
+    }
+
+    @Override
+    public void loadState(final Configuration configuration) {
+        this.memoryKey = configuration.getString(PAGE_RANK_MEMORY_KEY, DEFAULT_MEMORY_KEY);
+    }
+
+    @Override
+    public boolean doStage(final Stage stage) {
+        return stage.equals(Stage.MAP);
+    }
+
+    @Override
+    public void map(final Vertex vertex, final MapEmitter<Object, Double> emitter) {
+        final Property pageRank = vertex.property(PageRankVertexProgram.PAGE_RANK);
+        if (pageRank.isPresent()) {
+            emitter.emit(vertex.id(), (Double) pageRank.value());
+        }
+    }
+
+    @Override
+    public Iterator<KeyValue<Object, Double>> generateFinalResult(final Iterator<KeyValue<Object, Double>> keyValues) {
+        return keyValues;
+    }
+
+    @Override
+    public String getMemoryKey() {
+        return this.memoryKey;
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.mapReduceString(this, this.memoryKey);
+    }
+
+    //////////////////////////////
+
+    public static Builder build() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private String memoryKey = DEFAULT_MEMORY_KEY;
+
+        private Builder() {
+
+        }
+
+        public Builder memoryKey(final String memoryKey) {
+            this.memoryKey = memoryKey;
+            return this;
+        }
+
+        public PageRankMapReduce create() {
+            return new PageRankMapReduce(this.memoryKey);
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMessageCombiner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMessageCombiner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMessageCombiner.java
new file mode 100644
index 0000000..43eae71
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMessageCombiner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.ranking.pagerank;
+
+import com.tinkerpop.gremlin.process.computer.MessageCombiner;
+
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PageRankMessageCombiner implements MessageCombiner<Double> {
+
+    private static final Optional<PageRankMessageCombiner> INSTANCE = Optional.of(new PageRankMessageCombiner());
+
+    private PageRankMessageCombiner() {
+
+    }
+
+    @Override
+    public Double combine(final Double messageA, final Double messageB) {
+        return messageA + messageB;
+    }
+
+    public static Optional<PageRankMessageCombiner> instance() {
+        return INSTANCE;
+    }
+}