You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:01:49 UTC
[23/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop
directories to org/apache/tinkerpop
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
new file mode 100644
index 0000000..8d573ea
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import com.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
+import org.javatuples.Pair;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The Memory of a {@link GraphComputer} is a global data structure where by vertices can communicate information with one another.
+ * Moreover, it also contains global information about the state of the computation such as runtime and the current iteration.
+ * The Memory data is logically updated in parallel using associative/commutative methods which have embarrassingly parallel implementations.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Memory {
+
+ /**
+ * Whether the key exists in the memory.
+ *
+ * @param key key to search the memory for.
+ * @return whether the key exists
+ */
+ public default boolean exists(final String key) {
+ return this.keys().contains(key);
+ }
+
+ /**
+ * The set of keys currently associated with this memory.
+ *
+ * @return the memory's key set.
+ */
+ public Set<String> keys();
+
+ /**
+ * Get the value associated with the provided key.
+ *
+ * @param key the key of the value
+ * @param <R> the type of the value
+ * @return the value
+ * @throws IllegalArgumentException is thrown if the key does not exist
+ */
+ public <R> R get(final String key) throws IllegalArgumentException;
+
+ /**
+ * Set the value of the provided key. This is typically called in setup() and/or terminate() of the {@link VertexProgram}.
+ * If this is called during execute(), there is no guarantee as to the ultimately stored value as call order is indeterminate.
+ *
+ * @param key they key to set a value for
+ * @param value the value to set for the key
+ */
+ public void set(final String key, Object value);
+
+ /**
+ * A helper method that generates a {@link Map} of the memory key/values.
+ *
+ * @return the map representation of the memory key/values
+ */
+ public default Map<String, Object> asMap() {
+ final Map<String, Object> map = keys().stream()
+ .filter(this::exists)
+ .map(key -> Pair.with(key, get(key)))
+ .collect(Collectors.toMap(kv -> kv.getValue0(), Pair::getValue1));
+ return Collections.unmodifiableMap(map);
+ }
+
+ /**
+ * Get the current iteration number.
+ *
+ * @return the current iteration
+ */
+ public int getIteration();
+
+ /**
+ * Get the amount of milliseconds the {@link GraphComputer} has been executing thus far.
+ *
+ * @return the total time in milliseconds
+ */
+ public long getRuntime();
+
+ /**
+ * Add the provided delta value to the long value currently stored at the key.
+ *
+ * @param key the key of the long value
+ * @param delta the adjusting amount (can be negative for decrement)
+ * @return the adjusted value (according to the previous iterations current value + delta)
+ */
+ public long incr(final String key, final long delta);
+
+ /**
+ * Logically AND the provided boolean value with the boolean value currently stored at the key.
+ *
+ * @param key the key of the boolean value
+ * @param bool the boolean to AND
+ * @return the adjusted value (according to the previous iterations current value && bool)
+ */
+ public boolean and(final String key, final boolean bool);
+
+ /**
+ * Logically OR the provided boolean value with the boolean value currently stored at the key.
+ *
+ * @param key the key of the boolean value
+ * @param bool the boolean to OR
+ * @return the adjusted value (according to the previous iterations current value || bool)
+ */
+ public boolean or(final String key, final boolean bool);
+
+ /**
+ * A helper method that states whether the current iteration is 0.
+ *
+ * @return whether this is the first iteration
+ */
+ public default boolean isInitialIteration() {
+ return this.getIteration() == 0;
+ }
+
+ /**
+ * The Admin interface is used by the {@link GraphComputer} to update the Memory.
+ * The developer should never need to type-cast the provided Memory to Memory.Admin.
+ */
+ public interface Admin extends Memory {
+
+ public default void incrIteration() {
+ this.setIteration(this.getIteration() + 1);
+ }
+
+ public void setIteration(final int iteration);
+
+ public void setRuntime(final long runtime);
+
+ public default Memory asImmutable() {
+ return new ImmutableMemory(this);
+ }
+ }
+
+
+ public static class Exceptions {
+
+ public static IllegalArgumentException memoryKeyCanNotBeEmpty() {
+ return new IllegalArgumentException("Graph computer memory key can not be the empty string");
+ }
+
+ public static IllegalArgumentException memoryKeyCanNotBeNull() {
+ return new IllegalArgumentException("Graph computer memory key can not be null");
+ }
+
+ public static IllegalArgumentException memoryValueCanNotBeNull() {
+ return new IllegalArgumentException("Graph computer memory value can not be null");
+ }
+
+ public static IllegalStateException memoryIsCurrentlyImmutable() {
+ return new IllegalStateException("Graph computer memory is currently immutable");
+ }
+
+ public static IllegalArgumentException memoryDoesNotExist(final String key) {
+ return new IllegalArgumentException("The memory does not have a value for provided key: " + key);
+ }
+
+ public static UnsupportedOperationException dataTypeOfMemoryValueNotSupported(final Object val) {
+ return new UnsupportedOperationException(String.format("Graph computer memory value [%s] is of type %s is not supported", val, val.getClass()));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
new file mode 100644
index 0000000..c2d7535
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+/**
+ * A MessageCombiner allows two messages in route to the same vertex to be aggregated into a single message.
+ * Message combining can reduce the number of messages sent between vertices and thus, reduce network traffic.
+ * Not all messages can be combined and thus, this is an optional feature of a {@link VertexProgram}.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface MessageCombiner<M> {
+
+ /**
+ * Combine two messages and return a message containing the combination.
+ * In many instances, it is possible to simply merge the data in the second message into the first message.
+ * Such an optimization can limit the amount of object creation.
+ *
+ * @param messageA the first message
+ * @param messageB the second message
+ * @return the combination of the two messages
+ */
+ public M combine(final M messageA, final M messageB);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java
new file mode 100644
index 0000000..75e4965
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MessageScope.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.structure.Edge;
+import com.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.Arrays;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
+/**
+ * A {@link MessageScope} represents the range of a message. A message can have multiple receivers and message scope
+ * allows the underlying {@link GraphComputer} to apply the message passing algorithm in whichever manner is most efficient.
+ * It is best to use {@link MessageScope.Local} if possible as that provides more room for optimization by vendors than {@link MessageScope.Global}.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public abstract class MessageScope {
+
+ /**
+ * A Global message is directed at an arbitrary vertex in the graph.
+ * The recipient vertex need not be adjacent to the sending vertex.
+ * This message scope should be avoided if a {@link Local} can be used.
+ */
+ public final static class Global extends MessageScope {
+
+ private static final Global INSTANCE = Global.of();
+
+ private final Iterable<Vertex> vertices;
+
+ private Global(final Iterable<Vertex> vertices) {
+ this.vertices = vertices;
+ }
+
+ public static Global of(final Iterable<Vertex> vertices) {
+ return new Global(vertices);
+ }
+
+ public static Global of(final Vertex... vertices) {
+ return new Global(Arrays.asList(vertices));
+ }
+
+ public Iterable<Vertex> vertices() {
+ return this.vertices;
+ }
+
+ public static Global instance() {
+ return INSTANCE;
+ }
+ }
+
+ /**
+ * A Local message is directed to an adjacent (or "memory adjacent") vertex.
+ * The adjacent vertex set is defined by the provided {@link Traversal} that dictates how to go from the sending vertex to the receiving vertex.
+ * This is the preferred message scope as it can potentially be optimized by the underlying {@link Messenger} implementation.
+ * The preferred optimization is to not distribute a message object to all adjacent vertices.
+ * Instead, allow the recipients to read a single message object stored at the "sending" vertex.
+ * This is possible via `Traversal.reverse()`. This optimizations greatly reduces the amount of data created in the computation.
+ *
+ * @param <M> The {@link VertexProgram} message class
+ */
+ public final static class Local<M> extends MessageScope {
+ public final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal;
+ public final BiFunction<M, Edge, M> edgeFunction;
+
+ private Local(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal) {
+ this.incidentTraversal = incidentTraversal;
+ this.edgeFunction = (final M m, final Edge e) -> m; // the default is an identity function
+ }
+
+ private Local(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal, final BiFunction<M, Edge, M> edgeFunction) {
+ this.incidentTraversal = incidentTraversal;
+ this.edgeFunction = edgeFunction;
+ }
+
+ public static <M> Local<M> of(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal) {
+ return new Local(incidentTraversal);
+ }
+
+ public static <M> Local<M> of(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal, final BiFunction<M, Edge, M> edgeFunction) {
+ return new Local<>(incidentTraversal, edgeFunction);
+ }
+
+ public BiFunction<M, Edge, M> getEdgeFunction() {
+ return this.edgeFunction;
+ }
+
+ public Supplier<? extends Traversal<Vertex, Edge>> getIncidentTraversal() {
+ return this.incidentTraversal;
+ }
+
+ /**
+ * A helper class that can be used to generate the reverse traversal of the traversal within a {@link MessageScope.Local}.
+ */
+ public static class ReverseTraversalSupplier implements Supplier<Traversal<Vertex, Edge>> {
+
+ private final MessageScope.Local<?> localMessageScope;
+
+ public ReverseTraversalSupplier(final MessageScope.Local<?> localMessageScope) {
+ this.localMessageScope = localMessageScope;
+ }
+
+ public Traversal<Vertex, Edge> get() {
+ return this.localMessageScope.getIncidentTraversal().get().asAdmin().reverse();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java
new file mode 100644
index 0000000..087b61a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Messenger.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+/**
+ * The {@link Messenger} serves as the routing system for messages between vertices. For distributed systems,
+ * the messenger can implement a "message passing" engine (distributed memory). For single machine systems, the
+ * messenger can implement a "state sharing" engine (shared memory). Each messenger is tied to the particular
+ * vertex distributing the message.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public interface Messenger<M> {
+
+ /**
+ * The currently executing vertex can receive the messages of the provided {@link MessageScope}.
+ *
+ * @param messageScope the message scope of the messages to receive
+ * @return the messages for that vertex
+ */
+ public Iterable<M> receiveMessages(final MessageScope messageScope);
+
+ /**
+ * The currently executing vertex can send a message with provided {@link MessageScope}.
+ *
+ * @param messageScope the message scope of the message being sent
+ * @param message the message to send
+ */
+ public void sendMessage(final MessageScope messageScope, final M message);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
new file mode 100644
index 0000000..c207465
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.tinkerpop.gremlin.process.computer;
+
+import com.tinkerpop.gremlin.structure.Vertex;
+import org.apache.commons.configuration.Configuration;
+
+import java.lang.reflect.Constructor;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A {@link VertexProgram} represents one component of a distributed graph computation. Each vertex in the graph
+ * (logically) executes the {@link VertexProgram} instance in parallel. The collective behavior yields
+ * the computational result. In practice, a "worker" (i.e. task, thread, etc.) is responsible for executing the
+ * VertexProgram against each vertex that it has in its vertex set (a subset of the full graph vertex set).
+ * At minimum there is one "worker" for each vertex, though this is impractical in practice and {@link GraphComputer}
+ * implementations that leverage such a design are not expected to perform well due to the excess object creation.
+ * Any local state/fields in a VertexProgram is static to the vertices within the same worker set.
+ * It is not safe to assume that the VertexProgram's "worker" state will remain stable between iterations.
+ * Hence, the existence of {@link VertexProgram#workerIterationStart} and {@link VertexProgram#workerIterationEnd}.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public interface VertexProgram<M> extends Cloneable {
+
+ public static final String VERTEX_PROGRAM = "gremlin.vertexProgram";
+
+ /**
+ * When it is necessary to store the state of the VertexProgram, this method is called.
+ * This is typically required when the VertexProgram needs to be serialized to another machine.
+ * Note that what is stored is simply the instance/configuration state, not any processed data.
+ * The default implementation provided simply stores the VertexProgram class name for reflective reconstruction.
+ * It is typically a good idea to VertexProgram.super.storeState().
+ *
+ * @param configuration the configuration to store the state of the VertexProgram in.
+ */
+ public default void storeState(final Configuration configuration) {
+ configuration.setProperty(VERTEX_PROGRAM, this.getClass().getName());
+ }
+
+ /**
+ * When it is necessary to load the state of the VertexProgram, this method is called.
+ * This is typically required when the VertexProgram needs to be serialized to another machine.
+ * Note that what is loaded is simply the instance state, not any processed data.
+ * It is possible for this method to be called for each and every vertex in the graph.
+ * Thus, it is important to only store/load that state which is needed during execution in order to reduce startup time.
+ *
+ * @param configuration the configuration to load the state of the VertexProgram from.
+ */
+ public default void loadState(final Configuration configuration) {
+
+ }
+
+ /**
+ * The method is called at the beginning of the computation.
+ * The method is global to the {@link GraphComputer} and as such, is not called for each vertex.
+ * During this stage, the {@link Memory} should be initialized to to its "start state."
+ *
+ * @param memory The global memory of the GraphComputer
+ */
+ public void setup(final Memory memory);
+
+ /**
+ * This method denotes the main body of the computation and is executed on each vertex in the graph.
+ * This method is logically executed in parallel on all vertices in the graph.
+ * When the {@link Memory} is read, it is according to the aggregated state yielded in the previous iteration.
+ * When the {@link Memory} is written, the data will be aggregated at the end of the iteration for reading in the next iteration.
+ *
+ * @param vertex the {@link Vertex} to execute the {@link VertexProgram} on
+ * @param messenger the messenger that moves data between vertices
+ * @param memory the shared state between all vertices in the computation
+ */
+ public void execute(final Vertex vertex, final Messenger<M> messenger, final Memory memory);
+
+ /**
+ * The method is called at the end of each iteration to determine if the computation is complete.
+ * The method is global to the {@link GraphComputer} and as such, is not called for each {@link Vertex}.
+ * The {@link Memory} maintains the aggregated data from the last execute() iteration.
+ *
+ * @param memory The global memory of the {@link GraphComputer}
+ * @return whether or not to halt the computation
+ */
+ public boolean terminate(final Memory memory);
+
+ /**
+ * This method is called at the start of each iteration of each "computational chunk."
+ * The set of vertices in the graph are typically not processed with full parallelism.
+ * The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
+ * The typical use is to create static VertexProgram state that exists for the iteration of the vertex subset.
+ * The default implementation is a no-op.
+ *
+ * @param memory The memory at the start of the iteration.
+ */
+ public default void workerIterationStart(final Memory memory) {
+
+ }
+
+ /**
+ * This method is called at the end of each iteration of each "computational chunk."
+ * The set of vertices in the graph are typically not processed with full parallelism.
+ * The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
+ * The typical use is to destroy static VertexProgram state that existed during the iteration of the vertex subset.
+ * The default implementation is a no-op.
+ *
+ * @param memory The memory at the start of the iteration.
+ */
+ public default void workerIterationEnd(final Memory memory) {
+
+ }
+
+ /**
+ * The {@link com.tinkerpop.gremlin.structure.Element} properties that will be mutated during the computation.
+ * All properties in the graph are readable, but only the keys specified here are writable.
+ * The default is an empty set.
+ *
+ * @return the set of element keys that will be mutated during the vertex program's execution
+ */
+ public default Set<String> getElementComputeKeys() {
+ return Collections.emptySet();
+ }
+
+ /**
+ * The {@link Memory} keys that will be used during the computation.
+ * These are the only keys that can be read or written throughout the life of the {@link GraphComputer}.
+ * The default is an empty set.
+ *
+ * @return the set of memory keys that will be read/written
+ */
+ public default Set<String> getMemoryComputeKeys() {
+ return Collections.emptySet();
+ }
+
+ /**
+ * Combine the messages in route to a particular vertex. Useful to reduce the amount of data transmitted over the wire.
+ * For example, instead of sending two objects that will ultimately be merged at the vertex destination, merge/combine into one and send that object.
+ * If no message combiner is provider, then no messages will be combined.
+ * Furthermore, it is not guaranteed the all messages in route to the vertex will be combined and thus, combiner-state should not be used.
+ * The result of the vertex program algorithm should be the same regardless of whether message combining is executed or not.
+ *
+ * @return A optional denoting whether or not their is a message combine associated with the vertex program.
+ */
+ public default Optional<MessageCombiner<M>> getMessageCombiner() {
+ return Optional.empty();
+ }
+
+ /**
+ * This method returns all the {@link MessageScope} possibilities for a particular iteration of the vertex program.
+ * The returned messages scopes are the scopes that will be used to send messages during the stated iteration.
+ * It is not a requirement that all stated messages scopes be used, just that it is possible that they be used during the iteration.
+ *
+ * @param memory an immutable form of the {@link Memory}
+ * @return all possible message scopes during said vertex program iteration
+ */
+ public Set<MessageScope> getMessageScopes(final Memory memory);
+
+ /**
+ * The set of {@link MapReduce} jobs that are associated with the {@link VertexProgram}.
+ * This is not necessarily the exhaustive list over the life of the {@link GraphComputer}.
+ * If MapReduce jobs are declared by GraphComputer.mapReduce(), they are not contained in this set.
+ * The default is an empty set.
+ *
+ * @return the set of {@link MapReduce} jobs associated with this {@link VertexProgram}
+ */
+ public default Set<MapReduce> getMapReducers() {
+ return Collections.emptySet();
+ }
+
+ /**
+ * When multiple workers on a single machine need VertexProgram instances, it is possible to use clone.
+ * This will provide a speedier way of generating instances, over the {@link VertexProgram#storeState} and {@link VertexProgram#loadState} model.
+ * The default implementation simply returns the object as it assumes that the VertexProgram instance is a stateless singleton.
+ *
+ * @return A clone of the VertexProgram object
+ * @throws CloneNotSupportedException
+ */
+ public VertexProgram<M> clone() throws CloneNotSupportedException;
+
+ /**
+ * A helper method to construct a {@link VertexProgram} given the content of the supplied configuration.
+ * The class of the VertexProgram is read from the {@link VertexProgram#VERTEX_PROGRAM} static configuration key.
+ * Once the VertexProgram is constructed, {@link VertexProgram#loadState} method is called with the provided configuration.
+ *
+ * @param configuration A configuration with requisite information to build a vertex program
+ * @param <V> The vertex program type
+ * @return the newly constructed vertex program
+ */
+ public static <V extends VertexProgram> V createVertexProgram(final Configuration configuration) {
+ try {
+ final Class<V> vertexProgramClass = (Class) Class.forName(configuration.getString(VERTEX_PROGRAM));
+ final Constructor<V> constructor = vertexProgramClass.getDeclaredConstructor();
+ constructor.setAccessible(true);
+ final V vertexProgram = constructor.newInstance();
+ vertexProgram.loadState(configuration);
+ return vertexProgram;
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public interface Builder {
+
+ public Builder configure(final Object... keyValues);
+
+ public <P extends VertexProgram> P create();
+
+ }
+
+ public default Features getFeatures() {
+ return new Features() {
+ };
+ }
+
+ public interface Features {
+ public default boolean requiresGlobalMessageScopes() {
+ return false;
+ }
+
+ public default boolean requiresLocalMessageScopes() {
+ return false;
+ }
+
+ public default boolean requiresVertexAddition() {
+ return false;
+ }
+
+ public default boolean requiresVertexRemoval() {
+ return false;
+ }
+
+ public default boolean requiresVertexPropertyAddition() {
+ return false;
+ }
+
+ public default boolean requiresVertexPropertyRemoval() {
+ return false;
+ }
+
+ public default boolean requiresEdgeAddition() {
+ return false;
+ }
+
+ public default boolean requiresEdgeRemoval() {
+ return false;
+ }
+
+ public default boolean requiresEdgePropertyAddition() {
+ return false;
+ }
+
+ public default boolean requiresEdgePropertyRemoval() {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
new file mode 100644
index 0000000..8d39f64
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
+
+import com.tinkerpop.gremlin.process.computer.KeyValue;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import com.tinkerpop.gremlin.structure.Property;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ClusterCountMapReduce extends StaticMapReduce<MapReduce.NullObject, Serializable, MapReduce.NullObject, Integer, Integer> {
+
+ public static final String CLUSTER_COUNT_MEMORY_KEY = "gremlin.clusterCountMapReduce.memoryKey";
+ public static final String DEFAULT_MEMORY_KEY = "clusterCount";
+
+ private String memoryKey = DEFAULT_MEMORY_KEY;
+
+
+ private ClusterCountMapReduce() {
+
+ }
+
+ private ClusterCountMapReduce(final String memoryKey) {
+ this.memoryKey = memoryKey;
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ configuration.setProperty(CLUSTER_COUNT_MEMORY_KEY, this.memoryKey);
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.memoryKey = configuration.getString(CLUSTER_COUNT_MEMORY_KEY, DEFAULT_MEMORY_KEY);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return true;
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<NullObject, Serializable> emitter) {
+ final Property<Serializable> cluster = vertex.property(PeerPressureVertexProgram.CLUSTER);
+ if (cluster.isPresent()) {
+ emitter.emit(NullObject.instance(), cluster.value());
+ }
+ }
+
+ @Override
+ public void combine(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
+ this.reduce(key, values, emitter);
+ }
+
+ @Override
+ public void reduce(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
+ final Set<Serializable> set = new HashSet<>();
+ values.forEachRemaining(set::add);
+ emitter.emit(NullObject.instance(), set.size());
+
+ }
+
+ @Override
+ public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
+ return keyValues.next().getValue();
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.memoryKey;
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.mapReduceString(this, this.memoryKey);
+ }
+
+ //////////////////////////////
+
+ public static Builder build() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private String memoryKey = DEFAULT_MEMORY_KEY;
+
+ private Builder() {
+
+ }
+
+ public Builder memoryKey(final String memoryKey) {
+ this.memoryKey = memoryKey;
+ return this;
+ }
+
+ public ClusterCountMapReduce create() {
+ return new ClusterCountMapReduce(this.memoryKey);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
new file mode 100644
index 0000000..a8a400d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
+
+import com.tinkerpop.gremlin.process.computer.KeyValue;
+import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import com.tinkerpop.gremlin.structure.Property;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ClusterPopulationMapReduce extends StaticMapReduce<Serializable, Long, Serializable, Long, Map<Serializable, Long>> {
+
+ public static final String CLUSTER_POPULATION_MEMORY_KEY = "gremlin.clusterPopulationMapReduce.memoryKey";
+ public static final String DEFAULT_MEMORY_KEY = "clusterPopulation";
+
+ private String memoryKey = DEFAULT_MEMORY_KEY;
+
+ private ClusterPopulationMapReduce() {
+ }
+
+ private ClusterPopulationMapReduce(final String memoryKey) {
+ this.memoryKey = memoryKey;
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ configuration.setProperty(CLUSTER_POPULATION_MEMORY_KEY, this.memoryKey);
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.memoryKey = configuration.getString(CLUSTER_POPULATION_MEMORY_KEY, DEFAULT_MEMORY_KEY);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return true;
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<Serializable, Long> emitter) {
+ final Property<Serializable> cluster = vertex.property(PeerPressureVertexProgram.CLUSTER);
+ if (cluster.isPresent()) {
+ emitter.emit(cluster.value(), 1l);
+ }
+ }
+
+ @Override
+ public void combine(final Serializable key, final Iterator<Long> values, final ReduceEmitter<Serializable, Long> emitter) {
+ this.reduce(key, values, emitter);
+ }
+
+ @Override
+ public void reduce(final Serializable key, final Iterator<Long> values, final ReduceEmitter<Serializable, Long> emitter) {
+ long count = 0l;
+ while (values.hasNext()) {
+ count = count + values.next();
+ }
+ emitter.emit(key, count);
+ }
+
+ @Override
+ public Map<Serializable, Long> generateFinalResult(final Iterator<KeyValue<Serializable, Long>> keyValues) {
+ final Map<Serializable, Long> clusterPopulation = new HashMap<>();
+ keyValues.forEachRemaining(pair -> clusterPopulation.put(pair.getKey(), pair.getValue()));
+ return clusterPopulation;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.memoryKey;
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.mapReduceString(this, this.memoryKey);
+ }
+
+ //////////////////////////////
+
+ public static Builder build() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private String memoryKey = DEFAULT_MEMORY_KEY;
+
+ private Builder() {
+
+ }
+
+ public Builder memoryKey(final String memoryKey) {
+ this.memoryKey = memoryKey;
+ return this;
+ }
+
+ public ClusterPopulationMapReduce create() {
+ return new ClusterPopulationMapReduce(this.memoryKey);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
new file mode 100644
index 0000000..98fafcc
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.process.computer.MessageScope;
+import com.tinkerpop.gremlin.process.computer.Messenger;
+import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
+import com.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
+import com.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import com.tinkerpop.gremlin.process.graph.traversal.__;
+import com.tinkerpop.gremlin.process.util.MapHelper;
+import com.tinkerpop.gremlin.structure.Edge;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.VertexProperty;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import com.tinkerpop.gremlin.util.StreamFactory;
+import org.apache.commons.configuration.Configuration;
+import org.javatuples.Pair;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializable, Double>> {
+
+ private MessageScope.Local<?> voteScope = MessageScope.Local.of(__::outE);
+ private MessageScope.Local<?> countScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.voteScope));
+ private final Set<MessageScope> VOTE_SCOPE = new HashSet<>(Collections.singletonList(this.voteScope));
+ private final Set<MessageScope> COUNT_SCOPE = new HashSet<>(Collections.singletonList(this.countScope));
+
+ public static final String CLUSTER = "gremlin.peerPressureVertexProgram.cluster";
+ public static final String VOTE_STRENGTH = "gremlin.peerPressureVertexProgram.voteStrength";
+
+ private static final String MAX_ITERATIONS = "gremlin.peerPressureVertexProgram.maxIterations";
+ private static final String DISTRIBUTE_VOTE = "gremlin.peerPressureVertexProgram.distributeVote";
+ private static final String INCIDENT_TRAVERSAL_SUPPLIER = "gremlin.peerPressureVertexProgram.incidentTraversalSupplier";
+ private static final String VOTE_TO_HALT = "gremlin.peerPressureVertexProgram.voteToHalt";
+
+ private LambdaHolder<Supplier<Traversal<Vertex, Edge>>> traversalSupplier;
+ private int maxIterations = 30;
+ private boolean distributeVote = false;
+
+ private static final Set<String> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(CLUSTER, VOTE_STRENGTH));
+ private static final Set<String> MEMORY_COMPUTE_KEYS = new HashSet<>(Collections.singletonList(VOTE_TO_HALT));
+
+ private PeerPressureVertexProgram() {
+
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.traversalSupplier = LambdaHolder.loadState(configuration, INCIDENT_TRAVERSAL_SUPPLIER);
+ if (null != this.traversalSupplier) {
+ VertexProgramHelper.verifyReversibility(this.traversalSupplier.get().get().asAdmin());
+ this.voteScope = MessageScope.Local.of(this.traversalSupplier.get());
+ this.countScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.voteScope));
+ }
+ this.maxIterations = configuration.getInt(MAX_ITERATIONS, 30);
+ this.distributeVote = configuration.getBoolean(DISTRIBUTE_VOTE, false);
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ this.traversalSupplier.storeState(configuration);
+ configuration.setProperty(MAX_ITERATIONS, this.maxIterations);
+ configuration.setProperty(DISTRIBUTE_VOTE, this.distributeVote);
+
+ }
+
+ @Override
+ public Set<String> getElementComputeKeys() {
+ return ELEMENT_COMPUTE_KEYS;
+ }
+
+ @Override
+ public Set<String> getMemoryComputeKeys() {
+ return MEMORY_COMPUTE_KEYS;
+ }
+
+ @Override
+ public Set<MessageScope> getMessageScopes(final Memory memory) {
+ return this.distributeVote && memory.isInitialIteration() ? COUNT_SCOPE : VOTE_SCOPE;
+ }
+
+ @Override
+ public void setup(final Memory memory) {
+ memory.set(VOTE_TO_HALT, false);
+ }
+
+ @Override
+ public void execute(final Vertex vertex, Messenger<Pair<Serializable, Double>> messenger, final Memory memory) {
+ if (memory.isInitialIteration()) {
+ if (this.distributeVote) {
+ messenger.sendMessage(this.countScope, Pair.with('c', 1.0d));
+ } else {
+ double voteStrength = 1.0d;
+ vertex.property(VertexProperty.Cardinality.single, CLUSTER, vertex.id());
+ vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
+ messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
+ memory.and(VOTE_TO_HALT, false);
+ }
+ } else if (1 == memory.getIteration() && this.distributeVote) {
+ double voteStrength = 1.0d / StreamFactory.stream(messenger.receiveMessages(this.countScope)).map(Pair::getValue1).reduce(0.0d, (a, b) -> a + b);
+ vertex.property(VertexProperty.Cardinality.single, CLUSTER, vertex.id());
+ vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
+ messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
+ memory.and(VOTE_TO_HALT, false);
+ } else {
+ final Map<Serializable, Double> votes = new HashMap<>();
+ votes.put(vertex.value(CLUSTER), vertex.<Double>value(VOTE_STRENGTH));
+ messenger.receiveMessages(this.voteScope).forEach(message -> MapHelper.incr(votes, message.getValue0(), message.getValue1()));
+ Serializable cluster = PeerPressureVertexProgram.largestCount(votes);
+ if (null == cluster) cluster = (Serializable) vertex.id();
+ memory.and(VOTE_TO_HALT, vertex.value(CLUSTER).equals(cluster));
+ vertex.property(VertexProperty.Cardinality.single, CLUSTER, cluster);
+ messenger.sendMessage(this.voteScope, new Pair<>(cluster, vertex.<Double>value(VOTE_STRENGTH)));
+ }
+ }
+
+ @Override
+ public boolean terminate(final Memory memory) {
+ final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT) || memory.getIteration() >= (this.distributeVote ? this.maxIterations + 1 : this.maxIterations);
+ if (voteToHalt) {
+ return true;
+ } else {
+ memory.or(VOTE_TO_HALT, true);
+ return false;
+ }
+ }
+
+ private static <T> T largestCount(final Map<T, Double> map) {
+ T largestKey = null;
+ double largestValue = Double.MIN_VALUE;
+ for (Map.Entry<T, Double> entry : map.entrySet()) {
+ if (entry.getValue() == largestValue) {
+ if (null != largestKey && largestKey.toString().compareTo(entry.getKey().toString()) > 0) {
+ largestKey = entry.getKey();
+ largestValue = entry.getValue();
+ }
+ } else if (entry.getValue() > largestValue) {
+ largestKey = entry.getKey();
+ largestValue = entry.getValue();
+ }
+ }
+ return largestKey;
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.vertexProgramString(this, "distributeVote=" + this.distributeVote + ",maxIterations=" + this.maxIterations);
+ }
+
+ //////////////////////////////
+
+ public static Builder build() {
+ return new Builder();
+ }
+
+ public static class Builder extends AbstractVertexProgramBuilder<Builder> {
+
+
+ private Builder() {
+ super(PeerPressureVertexProgram.class);
+ }
+
+ public Builder maxIterations(final int iterations) {
+ this.configuration.setProperty(MAX_ITERATIONS, iterations);
+ return this;
+ }
+
+ public Builder distributeVote(final boolean distributeVote) {
+ this.configuration.setProperty(DISTRIBUTE_VOTE, distributeVote);
+ return this;
+ }
+
+ public Builder incident(final String scriptEngine, final String traversalScript) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, INCIDENT_TRAVERSAL_SUPPLIER, new String[]{scriptEngine, traversalScript});
+ return this;
+ }
+
+ public Builder incident(final String traversalScript) {
+ return incident(GREMLIN_GROOVY, traversalScript);
+ }
+
+ public Builder incident(final Supplier<Traversal<Vertex, Edge>> traversal) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, INCIDENT_TRAVERSAL_SUPPLIER, traversal);
+ return this;
+ }
+
+ public Builder incident(final Class<Supplier<Traversal<Vertex, Edge>>> traversalClass) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, INCIDENT_TRAVERSAL_SUPPLIER, traversalClass);
+ return this;
+ }
+
+ }
+
+ ////////////////////////////
+
+ @Override
+ public Features getFeatures() {
+ return new Features() {
+ @Override
+ public boolean requiresLocalMessageScopes() {
+ return true;
+ }
+
+ @Override
+ public boolean requiresVertexPropertyAddition() {
+ return true;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
new file mode 100644
index 0000000..13caa2f
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.lambda;
+
+import com.tinkerpop.gremlin.process.computer.KeyValue;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
+import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import com.tinkerpop.gremlin.util.function.TriConsumer;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class LambdaMapReduce<MK, MV, RK, RV, R> extends StaticMapReduce<MK, MV, RK, RV, R> {
+
+ public static final String MAP_LAMBDA = "gremlin.lambdaMapReduce.mapLambda";
+ public static final String MAP_KEY_SORT = "gremlin.lambdaMapReduce.mapKeySort";
+ public static final String COMBINE_LAMBDA = "gremlin.lambdaMapReduce.combineLambda";
+ public static final String REDUCE_LAMBDA = "gremlin.lambdaMapReduce.reduceLambda";
+ public static final String REDUCE_KEY_SORT = "gremlin.lambdaMapReduce.reduceKeySort";
+ public static final String MEMORY_LAMBDA = "gremlin.lambdaMapReduce.memoryLambda";
+ public static final String MEMORY_KEY = "gremlin.lambdaMapReduce.memoryKey";
+
+ private LambdaHolder<BiConsumer<Vertex, MapEmitter<MK, MV>>> mapLambdaHolder;
+ private LambdaHolder<Supplier<Comparator<MK>>> mapKeySortLambdaHolder;
+ private LambdaHolder<TriConsumer<MK, Iterator<MV>, ReduceEmitter<RK, RV>>> combineLambdaHolder;
+ private LambdaHolder<TriConsumer<MK, Iterator<MV>, ReduceEmitter<RK, RV>>> reduceLambdaHolder;
+ private LambdaHolder<Supplier<Comparator<RK>>> reduceKeySortLambdaHolder;
+ private LambdaHolder<Function<Iterator<KeyValue<RK, RV>>, R>> memoryLambdaHolder;
+ private String memoryKey;
+
+ private LambdaMapReduce() {
+
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.mapLambdaHolder = LambdaHolder.loadState(configuration, MAP_LAMBDA);
+ this.mapKeySortLambdaHolder = LambdaHolder.loadState(configuration, MAP_KEY_SORT);
+ this.combineLambdaHolder = LambdaHolder.loadState(configuration, COMBINE_LAMBDA);
+ this.reduceLambdaHolder = LambdaHolder.loadState(configuration, REDUCE_LAMBDA);
+ this.reduceKeySortLambdaHolder = LambdaHolder.loadState(configuration, REDUCE_KEY_SORT);
+ this.memoryLambdaHolder = LambdaHolder.loadState(configuration, MEMORY_LAMBDA);
+ this.memoryKey = configuration.getString(MEMORY_KEY, null);
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ if (null != this.mapLambdaHolder)
+ this.mapLambdaHolder.storeState(configuration);
+ if (null != this.mapKeySortLambdaHolder)
+ this.mapKeySortLambdaHolder.storeState(configuration);
+ if (null != this.combineLambdaHolder)
+ this.combineLambdaHolder.storeState(configuration);
+ if (null != this.reduceLambdaHolder)
+ this.reduceLambdaHolder.storeState(configuration);
+ if (null != this.reduceKeySortLambdaHolder)
+ this.reduceKeySortLambdaHolder.storeState(configuration);
+ if (null != this.memoryLambdaHolder)
+ this.memoryLambdaHolder.storeState(configuration);
+ configuration.setProperty(MEMORY_KEY, this.memoryKey);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ if (stage.equals(Stage.MAP))
+ return null != this.mapLambdaHolder;
+ else if (stage.equals(Stage.COMBINE))
+ return null != this.combineLambdaHolder;
+ else
+ return null != this.reduceLambdaHolder;
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<MK, MV> emitter) {
+ this.mapLambdaHolder.get().accept(vertex, emitter);
+ }
+
+ @Override
+ public void combine(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
+ this.combineLambdaHolder.get().accept(key, values, emitter);
+ }
+
+ @Override
+ public void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
+ this.reduceLambdaHolder.get().accept(key, values, emitter);
+ }
+
+ @Override
+ public Optional<Comparator<MK>> getMapKeySort() {
+ return null == this.mapKeySortLambdaHolder ? Optional.empty() : Optional.of(this.mapKeySortLambdaHolder.get().get());
+ }
+
+ @Override
+ public Optional<Comparator<RK>> getReduceKeySort() {
+ return null == this.reduceKeySortLambdaHolder ? Optional.empty() : Optional.of(this.reduceKeySortLambdaHolder.get().get());
+ }
+
+ @Override
+ public R generateFinalResult(final Iterator<KeyValue<RK, RV>> keyValues) {
+ return null == this.memoryLambdaHolder ? (R) keyValues : this.memoryLambdaHolder.get().apply(keyValues);
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.memoryKey;
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.mapReduceString(this, this.memoryKey);
+ }
+
+ //////////////////
+
+ public static <MK, MV, RK, RV, R> Builder<MK, MV, RK, RV, R> build() {
+ return new Builder<>();
+ }
+
+ public static class Builder<MK, MV, RK, RV, R> {
+
+ private final Configuration configuration = new BaseConfiguration();
+
+ public Builder<MK, MV, RK, RV, R> map(final BiConsumer<Vertex, MapReduce.MapEmitter<MK, MV>> mapLambda) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MAP_LAMBDA, mapLambda);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> map(final Class<? extends BiConsumer<Vertex, MapReduce.MapEmitter<MK, MV>>> mapClass) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_LAMBDA, mapClass);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> map(final String scriptEngine, final String mapScript) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_LAMBDA, new String[]{scriptEngine, mapScript});
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> map(final String setupScript) {
+ return map(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+ }
+
+ //
+
+ public Builder<MK, MV, RK, RV, R> mapKeySort(final Comparator<MK> comparator) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MAP_KEY_SORT, comparator);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> mapKeySort(final Class<? extends Comparator<MK>> comparatorClass) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, MAP_KEY_SORT, comparatorClass);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> mapKeySort(final String scriptEngine, final String reduceScript) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_KEY_SORT, new String[]{scriptEngine, reduceScript});
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> mapKeySort(final String setupScript) {
+ return mapKeySort(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+ }
+
+ ////////////
+
+ public Builder<MK, MV, RK, RV, R> combine(TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>> combineLambda) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, COMBINE_LAMBDA, combineLambda);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> combine(final Class<? extends TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>>> combineClass) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, COMBINE_LAMBDA, combineClass);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> combine(final String scriptEngine, final String combineScript) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, COMBINE_LAMBDA, new String[]{scriptEngine, combineScript});
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> combine(final String setupScript) {
+ return combine(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+ }
+
+ ////////////
+
+ public Builder<MK, MV, RK, RV, R> reduce(TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>> reduceLambda) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, REDUCE_LAMBDA, reduceLambda);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> reduce(Class<? extends TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>>> reduceClass) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, REDUCE_LAMBDA, reduceClass);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> reduce(final String scriptEngine, final String reduceScript) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, REDUCE_LAMBDA, new String[]{scriptEngine, reduceScript});
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> reduce(final String setupScript) {
+ return reduce(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+ }
+
+ //
+
+ public Builder<MK, MV, RK, RV, R> reduceKeySort(final Supplier<Comparator<RK>> comparator) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, REDUCE_KEY_SORT, comparator);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> reduceKeySort(final Class<? extends Supplier<Comparator<RK>>> comparatorClass) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, REDUCE_KEY_SORT, comparatorClass);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> reduceKeySort(final String scriptEngine, final String reduceScript) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, REDUCE_KEY_SORT, new String[]{scriptEngine, reduceScript});
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> reduceKeySort(final String setupScript) {
+ return reduceKeySort(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+ }
+
+ ////////////
+
+ public Builder<MK, MV, RK, RV, R> memory(Function<Iterator<KeyValue<RK, RV>>, R> memoryLambda) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MEMORY_LAMBDA, memoryLambda);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> memory(Class<? extends Function<Iterator<KeyValue<RK, RV>>, R>> memoryClass) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, MEMORY_LAMBDA, memoryClass);
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> memory(final String scriptEngine, final String memoryScript) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MEMORY_LAMBDA, new String[]{scriptEngine, memoryScript});
+ return this;
+ }
+
+ public Builder<MK, MV, RK, RV, R> memory(final String setupScript) {
+ return memory(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
+ }
+
+ ////////////
+
+ public Builder<MK, MV, RK, RV, R> memoryKey(final String memoryKey) {
+ this.configuration.setProperty(LambdaMapReduce.MEMORY_KEY, memoryKey);
+ return this;
+ }
+
+ public LambdaMapReduce<MK, MV, RK, RV, R> create() {
+ LambdaMapReduce<MK, MV, RK, RV, R> lambdaMapReduce = new LambdaMapReduce<>();
+ lambdaMapReduce.loadState(this.configuration);
+ return lambdaMapReduce;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaVertexProgram.java
new file mode 100644
index 0000000..45bd9b9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/lambda/LambdaVertexProgram.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.lambda;
+
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.process.computer.MessageScope;
+import com.tinkerpop.gremlin.process.computer.Messenger;
+import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
+import com.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
+import com.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import com.tinkerpop.gremlin.util.function.TriConsumer;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class LambdaVertexProgram<M extends Serializable> extends StaticVertexProgram<M> {
+
+ private static final Set<MessageScope> MESSAGE_SCOPES = new HashSet<>(Collections.singletonList(MessageScope.Global.instance()));
+
+ private static final String SETUP_LAMBDA = "gremlin.lambdaVertexProgram.setupLambda";
+ private static final String EXECUTE_LAMBDA = "gremlin.lambdaVertexProgram.executeLambda";
+ private static final String TERMINATE_LAMBDA = "gremlin.lambdaVertexProgram.terminateLambda";
+ private static final String ELEMENT_COMPUTE_KEYS = "gremlin.lambdaVertexProgram.elementComputeKeys";
+ private static final String MEMORY_COMPUTE_KEYS = "gremlin.lambdaVertexProgram.memoryComputeKeys";
+
+ private LambdaHolder<Consumer<Memory>> setupLambdaHolder;
+ private Consumer<Memory> setupLambda;
+ private LambdaHolder<TriConsumer<Vertex, Messenger<M>, Memory>> executeLambdaHolder;
+ private TriConsumer<Vertex, Messenger<M>, Memory> executeLambda;
+ private LambdaHolder<Predicate<Memory>> terminateLambdaHolder;
+ private Predicate<Memory> terminateLambda;
+ private Set<String> elementComputeKeys;
+ private Set<String> memoryComputeKeys;
+
+ private LambdaVertexProgram() {
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.setupLambdaHolder = LambdaHolder.loadState(configuration, SETUP_LAMBDA);
+ this.executeLambdaHolder = LambdaHolder.loadState(configuration, EXECUTE_LAMBDA);
+ this.terminateLambdaHolder = LambdaHolder.loadState(configuration, TERMINATE_LAMBDA);
+ this.setupLambda = null == this.setupLambdaHolder ? s -> {
+ } : this.setupLambdaHolder.get();
+ this.executeLambda = null == this.executeLambdaHolder ? (v, m, s) -> {
+ } : this.executeLambdaHolder.get();
+ this.terminateLambda = null == this.terminateLambdaHolder ? s -> true : this.terminateLambdaHolder.get();
+
+ try {
+ this.elementComputeKeys = configuration.containsKey(ELEMENT_COMPUTE_KEYS) ?
+ VertexProgramHelper.deserialize(configuration, ELEMENT_COMPUTE_KEYS) : Collections.emptySet();
+ this.memoryComputeKeys = configuration.containsKey(MEMORY_COMPUTE_KEYS) ?
+ VertexProgramHelper.deserialize(configuration, MEMORY_COMPUTE_KEYS) : Collections.emptySet();
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ if (null != this.setupLambdaHolder)
+ this.setupLambdaHolder.storeState(configuration);
+ if (null != this.executeLambdaHolder)
+ this.executeLambdaHolder.storeState(configuration);
+ if (null != this.terminateLambdaHolder)
+ this.terminateLambdaHolder.storeState(configuration);
+
+ try {
+ VertexProgramHelper.serialize(this.elementComputeKeys, configuration, ELEMENT_COMPUTE_KEYS);
+ VertexProgramHelper.serialize(this.memoryComputeKeys, configuration, MEMORY_COMPUTE_KEYS);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void setup(final Memory memory) {
+ this.setupLambda.accept(memory);
+ }
+
+
+ @Override
+ public void execute(final Vertex vertex, final Messenger<M> messenger, final Memory memory) {
+ this.executeLambda.accept(vertex, messenger, memory);
+ }
+
+ @Override
+ public boolean terminate(final Memory memory) {
+ return this.terminateLambda.test(memory);
+ }
+
+ @Override
+ public Set<String> getElementComputeKeys() {
+ return this.elementComputeKeys;
+ }
+
+ @Override
+ public Set<String> getMemoryComputeKeys() {
+ return this.memoryComputeKeys;
+ }
+
+ @Override
+ public Set<MessageScope> getMessageScopes(final Memory memory) {
+ return MESSAGE_SCOPES;
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.vertexProgramString(this);
+ }
+
+ //////////////////////////////
+
+ public static Builder build() {
+ return new Builder();
+ }
+
+ public static class Builder extends AbstractVertexProgramBuilder<Builder> {
+
+
+ private Builder() {
+ super(LambdaVertexProgram.class);
+ }
+
+ public Builder setup(final Consumer<Memory> setupLambda) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, SETUP_LAMBDA, setupLambda);
+ return this;
+ }
+
+ public Builder setup(final Class<? extends Consumer<Memory>> setupClass) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, SETUP_LAMBDA, setupClass);
+ return this;
+ }
+
+ public Builder setup(final String scriptEngine, final String setupScript) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, SETUP_LAMBDA, new String[]{scriptEngine, setupScript});
+ return this;
+ }
+
+ public Builder setup(final String setupScript) {
+ return setup(GREMLIN_GROOVY, setupScript);
+ }
+
+ ///////
+
+ public Builder execute(final TriConsumer<Vertex, Messenger, Memory> executeLambda) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, EXECUTE_LAMBDA, executeLambda);
+ return this;
+ }
+
+ public Builder execute(final Class<? extends TriConsumer<Vertex, Messenger, Memory>> executeClass) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, EXECUTE_LAMBDA, executeClass);
+ return this;
+ }
+
+ public Builder execute(final String scriptEngine, final String executeScript) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, EXECUTE_LAMBDA, new String[]{scriptEngine, executeScript});
+ return this;
+ }
+
+ public Builder execute(final String setupScript) {
+ return execute(GREMLIN_GROOVY, setupScript);
+ }
+
+ ///////
+
+ public Builder terminate(final Predicate<Memory> terminateLambda) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, TERMINATE_LAMBDA, terminateLambda);
+ return this;
+ }
+
+ public Builder terminate(final Class<? extends Predicate<Memory>> terminateClass) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, TERMINATE_LAMBDA, terminateClass);
+ return this;
+ }
+
+ public Builder terminate(final String scriptEngine, final String terminateScript) {
+ LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, TERMINATE_LAMBDA, new String[]{scriptEngine, terminateScript});
+ return this;
+ }
+
+ public Builder terminate(final String setupScript) {
+ return terminate(GREMLIN_GROOVY, setupScript);
+ }
+
+ ///////
+
+ public Builder memoryComputeKeys(final Set<String> memoryComputeKeys) {
+ try {
+ VertexProgramHelper.serialize(memoryComputeKeys, configuration, MEMORY_COMPUTE_KEYS);
+ return this;
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public Builder elementComputeKeys(final Set<String> elementComputeKeys) {
+ try {
+ VertexProgramHelper.serialize(elementComputeKeys, configuration, ELEMENT_COMPUTE_KEYS);
+ return this;
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public Builder memoryComputeKeys(final String... memoryComputeKeys) {
+ return this.memoryComputeKeys(new HashSet<>(Arrays.asList(memoryComputeKeys)));
+ }
+
+ public Builder elementComputeKeys(final String... elementComputeKeys) {
+ return this.elementComputeKeys(new HashSet<>(Arrays.asList(elementComputeKeys)));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMapReduce.java
new file mode 100644
index 0000000..c29bdb8
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMapReduce.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.ranking.pagerank;
+
+import com.tinkerpop.gremlin.process.computer.KeyValue;
+import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import com.tinkerpop.gremlin.structure.Property;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PageRankMapReduce extends StaticMapReduce<Object, Double, Object, Double, Iterator<KeyValue<Object, Double>>> {
+
+ public static final String PAGE_RANK_MEMORY_KEY = "gremlin.pageRankMapReduce.memoryKey";
+ public static final String DEFAULT_MEMORY_KEY = "pageRank";
+
+ private String memoryKey = DEFAULT_MEMORY_KEY;
+
+ private PageRankMapReduce() {
+
+ }
+
+ private PageRankMapReduce(final String memoryKey) {
+ this.memoryKey = memoryKey;
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ configuration.setProperty(PAGE_RANK_MEMORY_KEY, this.memoryKey);
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.memoryKey = configuration.getString(PAGE_RANK_MEMORY_KEY, DEFAULT_MEMORY_KEY);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return stage.equals(Stage.MAP);
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<Object, Double> emitter) {
+ final Property pageRank = vertex.property(PageRankVertexProgram.PAGE_RANK);
+ if (pageRank.isPresent()) {
+ emitter.emit(vertex.id(), (Double) pageRank.value());
+ }
+ }
+
+ @Override
+ public Iterator<KeyValue<Object, Double>> generateFinalResult(final Iterator<KeyValue<Object, Double>> keyValues) {
+ return keyValues;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.memoryKey;
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.mapReduceString(this, this.memoryKey);
+ }
+
+ //////////////////////////////
+
+ public static Builder build() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private String memoryKey = DEFAULT_MEMORY_KEY;
+
+ private Builder() {
+
+ }
+
+ public Builder memoryKey(final String memoryKey) {
+ this.memoryKey = memoryKey;
+ return this;
+ }
+
+ public PageRankMapReduce create() {
+ return new PageRankMapReduce(this.memoryKey);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMessageCombiner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMessageCombiner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMessageCombiner.java
new file mode 100644
index 0000000..43eae71
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankMessageCombiner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.ranking.pagerank;
+
+import com.tinkerpop.gremlin.process.computer.MessageCombiner;
+
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PageRankMessageCombiner implements MessageCombiner<Double> {
+
+ private static final Optional<PageRankMessageCombiner> INSTANCE = Optional.of(new PageRankMessageCombiner());
+
+ private PageRankMessageCombiner() {
+
+ }
+
+ @Override
+ public Double combine(final Double messageA, final Double messageB) {
+ return messageA + messageB;
+ }
+
+ public static Optional<PageRankMessageCombiner> instance() {
+ return INSTANCE;
+ }
+}