You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:02:11 UTC

[45/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop directories to org/apache/tinkerpop

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MapReduce.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MapReduce.java
deleted file mode 100644
index b669aa9..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MapReduce.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer;
-
-import com.tinkerpop.gremlin.structure.Vertex;
-import org.apache.commons.configuration.Configuration;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Optional;
-
-/**
- * A MapReduce is composed of map(), combine(), and reduce() stages.
- * The map() stage processes the vertices of the {@link com.tinkerpop.gremlin.structure.Graph} in a logically parallel manner.
- * The combine() stage aggregates the values of a particular map emitted key prior to sending across the cluster.
- * The reduce() stage aggregates the values of the combine/map emitted keys for the keys that hash to the current machine in the cluster.
- * The interface presented here is nearly identical to the interface popularized by Hadoop save the the map() is over the vertices of the graph.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
-
-    public static final String MAP_REDUCE = "gremlin.mapReduce";
-
-    /**
-     * MapReduce is composed of three stages: map, combine, and reduce.
-     */
-    public static enum Stage {
-        MAP, COMBINE, REDUCE
-    }
-
-    /**
-     * When it is necessary to store the state of a MapReduce job, this method is called.
-     * This is typically required when the MapReduce job needs to be serialized to another machine.
-     * Note that what is stored is simply the instance state, not any processed data.
-     *
-     * @param configuration the configuration to store the state of the MapReduce job in.
-     */
-    public default void storeState(final Configuration configuration) {
-        configuration.setProperty(MAP_REDUCE, this.getClass().getName());
-    }
-
-    /**
-     * When it is necessary to load the state of a MapReduce job, this method is called.
-     * This is typically required when the MapReduce job needs to be serialized to another machine.
-     * Note that what is loaded is simply the instance state, not any processed data.
-     * <p/>
-     * It is important that the state loaded from loadState() is identical to any state created from a constructor.
-     * For those GraphComputers that do not need to use Configurations to migrate state between JVMs, the constructor will only be used.
-     *
-     * @param configuration the configuration to load the state of the MapReduce job from.
-     */
-    public default void loadState(final Configuration configuration) {
-
-    }
-
-    /**
-     * A MapReduce job can be map-only, map-reduce-only, or map-combine-reduce.
-     * Before executing the particular stage, this method is called to determine if the respective stage is defined.
-     * This method should return true if the respective stage as a non-default method implementation.
-     *
-     * @param stage the stage to check for definition.
-     * @return whether that stage should be executed.
-     */
-    public boolean doStage(final Stage stage);
-
-    /**
-     * The map() method is logically executed at all vertices in the graph in parallel.
-     * The map() method emits key/value pairs given some analysis of the data in the vertices (and/or its incident edges).
-     *
-     * @param vertex  the current vertex being map() processed.
-     * @param emitter the component that allows for key/value pairs to be emitted to the next stage.
-     */
-    public default void map(final Vertex vertex, final MapEmitter<MK, MV> emitter) {
-    }
-
-    /**
-     * The combine() method is logically executed at all "machines" in parallel.
-     * The combine() method pre-combines the values for a key prior to propagation over the wire.
-     * The combine() method must emit the same key/value pairs as the reduce() method.
-     * If there is a combine() implementation, there must be a reduce() implementation.
-     * If the MapReduce implementation is single machine, it can skip executing this method as reduce() is sufficient.
-     *
-     * @param key     the key that has aggregated values
-     * @param values  the aggregated values associated with the key
-     * @param emitter the component that allows for key/value pairs to be emitted to the reduce stage.
-     */
-    public default void combine(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
-    }
-
-    /**
-     * The reduce() method is logically on the "machine" the respective key hashes to.
-     * The reduce() method combines all the values associated with the key and emits key/value pairs.
-     *
-     * @param key     the key that has aggregated values
-     * @param values  the aggregated values associated with the key
-     * @param emitter the component that allows for key/value pairs to be emitted as the final result.
-     */
-    public default void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
-    }
-
-    /**
-     * If a {@link Comparator} is provided, then all pairs leaving the {@link MapEmitter} are sorted.
-     * The sorted results are either fed sorted to the combine/reduce-stage or as the final output.
-     * If sorting is not required, then {@link Optional#empty} should be returned as sorting is computationally expensive.
-     * The default implementation returns {@link Optional#empty}.
-     *
-     * @return an {@link Optional} of a comparator for sorting the map output.
-     */
-    public default Optional<Comparator<MK>> getMapKeySort() {
-        return Optional.empty();
-    }
-
-    /**
-     * If a {@link Comparator} is provided, then all pairs leaving the {@link ReduceEmitter} are sorted.
-     * If sorting is not required, then {@link Optional#empty} should be returned as sorting is computationally expensive.
-     * The default implementation returns {@link Optional#empty}.
-     *
-     * @return an {@link Optional} of a comparator for sorting the reduce output.
-     */
-    public default Optional<Comparator<RK>> getReduceKeySort() {
-        return Optional.empty();
-    }
-
-    /**
-     * The key/value pairs emitted by reduce() (or map() in a map-only job) can be iterated to generate a local JVM Java object.
-     *
-     * @param keyValues the key/value pairs that were emitted from reduce() (or map() in a map-only job)
-     * @return the resultant object formed from the emitted key/values.
-     */
-    public R generateFinalResult(final Iterator<KeyValue<RK, RV>> keyValues);
-
-    /**
-     * The results of the MapReduce job are associated with a memory-key to ultimately be stored in {@link Memory}.
-     *
-     * @return the memory key of the generated result object.
-     */
-    public String getMemoryKey();
-
-    /**
-     * The final result can be generated and added to {@link Memory} and accessible via {@link com.tinkerpop.gremlin.process.computer.util.DefaultComputerResult}.
-     * The default simply takes the object from generateFinalResult() and adds it to the Memory given getMemoryKey().
-     *
-     * @param memory    the memory of the {@link GraphComputer}
-     * @param keyValues the key/value pairs emitted from reduce() (or map() in a map only job).
-     */
-    public default void addResultToMemory(final Memory.Admin memory, final Iterator<KeyValue<RK, RV>> keyValues) {
-        memory.set(this.getMemoryKey(), this.generateFinalResult(keyValues));
-    }
-
-    /**
-     * When multiple workers on a single machine need MapReduce instances, it is possible to use clone.
-     * This will provide a speedier way of generating instances, over the {@link MapReduce#storeState} and {@link MapReduce#loadState} model.
-     * The default implementation simply returns the object as it assumes that the MapReduce instance is a stateless singleton.
-     *
-     * @return A clone of the MapReduce object
-     * @throws CloneNotSupportedException
-     */
-    public MapReduce<MK, MV, RK, RV, R> clone() throws CloneNotSupportedException;
-
-    /**
-     * A helper method to construct a {@link MapReduce} given the content of the supplied configuration.
-     * The class of the MapReduce is read from the {@link MapReduce#MAP_REDUCE} static configuration key.
-     * Once the MapReduce is constructed, {@link MapReduce#loadState} method is called with the provided configuration.
-     *
-     * @param configuration A configuration with requisite information to build a MapReduce
-     * @return the newly constructed MapReduce
-     */
-    public static <M extends MapReduce<MK, MV, RK, RV, R>, MK, MV, RK, RV, R> M createMapReduce(final Configuration configuration) {
-        try {
-            final Class<M> mapReduceClass = (Class) Class.forName(configuration.getString(MAP_REDUCE));
-            final Constructor<M> constructor = mapReduceClass.getDeclaredConstructor();
-            constructor.setAccessible(true);
-            final M mapReduce = constructor.newInstance();
-            mapReduce.loadState(configuration);
-            return mapReduce;
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    //////////////////
-
-    /**
-     * The MapEmitter is used to emit key/value pairs from the map() stage of the MapReduce job.
-     * The implementation of MapEmitter is up to the vendor, not the developer.
-     *
-     * @param <K> the key type
-     * @param <V> the value type
-     */
-    public interface MapEmitter<K, V> {
-        public void emit(final K key, final V value);
-
-        /**
-         * A default method that assumes the key is {@link com.tinkerpop.gremlin.process.computer.MapReduce.NullObject}.
-         *
-         * @param value the value to emit.
-         */
-        public default void emit(final V value) {
-            this.emit((K) MapReduce.NullObject.instance(), value);
-        }
-    }
-
-    /**
-     * The ReduceEmitter is used to emit key/value pairs from the combine() and reduce() stages of the MapReduce job.
-     * The implementation of ReduceEmitter is up to the vendor, not the developer.
-     *
-     * @param <OK> the key type
-     * @param <OV> the value type
-     */
-    public interface ReduceEmitter<OK, OV> {
-        public void emit(final OK key, OV value);
-
-        /**
-         * A default method that assumes the key is {@link com.tinkerpop.gremlin.process.computer.MapReduce.NullObject}.
-         *
-         * @param value the value to emit.
-         */
-        public default void emit(final OV value) {
-            this.emit((OK) MapReduce.NullObject.instance(), value);
-        }
-    }
-
-    //////////////////
-
-    /**
-     * A convenience singleton when a single key is needed so that all emitted values converge to the same combiner/reducer.
-     */
-    public static class NullObject implements Comparable<NullObject>, Serializable {
-        private static final NullObject INSTANCE = new NullObject();
-        private static final String NULL_OBJECT = "";
-
-        public static NullObject instance() {
-            return INSTANCE;
-        }
-
-        @Override
-        public int hashCode() {
-            return 666666666;
-        }
-
-        @Override
-        public boolean equals(final Object object) {
-            return object instanceof NullObject;
-        }
-
-        @Override
-        public int compareTo(final NullObject nullObject) {
-            return 0;
-        }
-
-        @Override
-        public String toString() {
-            return NULL_OBJECT;
-        }
-
-        private void readObject(final ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
-
-        }
-
-        private void writeObject(final ObjectOutputStream outputStream) throws IOException {
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Memory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Memory.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Memory.java
deleted file mode 100644
index 8d573ea..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Memory.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer;
-
-import com.tinkerpop.gremlin.process.computer.util.ImmutableMemory;
-import org.javatuples.Pair;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * The Memory of a {@link GraphComputer} is a global data structure where by vertices can communicate information with one another.
- * Moreover, it also contains global information about the state of the computation such as runtime and the current iteration.
- * The Memory data is logically updated in parallel using associative/commutative methods which have embarrassingly parallel implementations.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface Memory {
-
-    /**
-     * Whether the key exists in the memory.
-     *
-     * @param key key to search the memory for.
-     * @return whether the key exists
-     */
-    public default boolean exists(final String key) {
-        return this.keys().contains(key);
-    }
-
-    /**
-     * The set of keys currently associated with this memory.
-     *
-     * @return the memory's key set.
-     */
-    public Set<String> keys();
-
-    /**
-     * Get the value associated with the provided key.
-     *
-     * @param key the key of the value
-     * @param <R> the type of the value
-     * @return the value
-     * @throws IllegalArgumentException is thrown if the key does not exist
-     */
-    public <R> R get(final String key) throws IllegalArgumentException;
-
-    /**
-     * Set the value of the provided key. This is typically called in setup() and/or terminate() of the {@link VertexProgram}.
-     * If this is called during execute(), there is no guarantee as to the ultimately stored value as call order is indeterminate.
-     *
-     * @param key   they key to set a value for
-     * @param value the value to set for the key
-     */
-    public void set(final String key, Object value);
-
-    /**
-     * A helper method that generates a {@link Map} of the memory key/values.
-     *
-     * @return the map representation of the memory key/values
-     */
-    public default Map<String, Object> asMap() {
-        final Map<String, Object> map = keys().stream()
-                .filter(this::exists)
-                .map(key -> Pair.with(key, get(key)))
-                .collect(Collectors.toMap(kv -> kv.getValue0(), Pair::getValue1));
-        return Collections.unmodifiableMap(map);
-    }
-
-    /**
-     * Get the current iteration number.
-     *
-     * @return the current iteration
-     */
-    public int getIteration();
-
-    /**
-     * Get the amount of milliseconds the {@link GraphComputer} has been executing thus far.
-     *
-     * @return the total time in milliseconds
-     */
-    public long getRuntime();
-
-    /**
-     * Add the provided delta value to the long value currently stored at the key.
-     *
-     * @param key   the key of the long value
-     * @param delta the adjusting amount (can be negative for decrement)
-     * @return the adjusted value (according to the previous iterations current value + delta)
-     */
-    public long incr(final String key, final long delta);
-
-    /**
-     * Logically AND the provided boolean value with the boolean value currently stored at the key.
-     *
-     * @param key  the key of the boolean value
-     * @param bool the boolean to AND
-     * @return the adjusted value (according to the previous iterations current value && bool)
-     */
-    public boolean and(final String key, final boolean bool);
-
-    /**
-     * Logically OR the provided boolean value with the boolean value currently stored at the key.
-     *
-     * @param key  the key of the boolean value
-     * @param bool the boolean to OR
-     * @return the adjusted value (according to the previous iterations current value || bool)
-     */
-    public boolean or(final String key, final boolean bool);
-
-    /**
-     * A helper method that states whether the current iteration is 0.
-     *
-     * @return whether this is the first iteration
-     */
-    public default boolean isInitialIteration() {
-        return this.getIteration() == 0;
-    }
-
-    /**
-     * The Admin interface is used by the {@link GraphComputer} to update the Memory.
-     * The developer should never need to type-cast the provided Memory to Memory.Admin.
-     */
-    public interface Admin extends Memory {
-
-        public default void incrIteration() {
-            this.setIteration(this.getIteration() + 1);
-        }
-
-        public void setIteration(final int iteration);
-
-        public void setRuntime(final long runtime);
-
-        public default Memory asImmutable() {
-            return new ImmutableMemory(this);
-        }
-    }
-
-
-    public static class Exceptions {
-
-        public static IllegalArgumentException memoryKeyCanNotBeEmpty() {
-            return new IllegalArgumentException("Graph computer memory key can not be the empty string");
-        }
-
-        public static IllegalArgumentException memoryKeyCanNotBeNull() {
-            return new IllegalArgumentException("Graph computer memory key can not be null");
-        }
-
-        public static IllegalArgumentException memoryValueCanNotBeNull() {
-            return new IllegalArgumentException("Graph computer memory value can not be null");
-        }
-
-        public static IllegalStateException memoryIsCurrentlyImmutable() {
-            return new IllegalStateException("Graph computer memory is currently immutable");
-        }
-
-        public static IllegalArgumentException memoryDoesNotExist(final String key) {
-            return new IllegalArgumentException("The memory does not have a value for provided key: " + key);
-        }
-
-        public static UnsupportedOperationException dataTypeOfMemoryValueNotSupported(final Object val) {
-            return new UnsupportedOperationException(String.format("Graph computer memory value [%s] is of type %s is not supported", val, val.getClass()));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageCombiner.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageCombiner.java
deleted file mode 100644
index c2d7535..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageCombiner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer;
-
-/**
- * A MessageCombiner allows two messages in route to the same vertex to be aggregated into a single message.
- * Message combining can reduce the number of messages sent between vertices and thus, reduce network traffic.
- * Not all messages can be combined and thus, this is an optional feature of a {@link VertexProgram}.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface MessageCombiner<M> {
-
-    /**
-     * Combine two messages and return a message containing the combination.
-     * In many instances, it is possible to simply merge the data in the second message into the first message.
-     * Such an optimization can limit the amount of object creation.
-     *
-     * @param messageA the first message
-     * @param messageB the second message
-     * @return the combination of the two messages
-     */
-    public M combine(final M messageA, final M messageB);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageScope.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageScope.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageScope.java
deleted file mode 100644
index 75e4965..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/MessageScope.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer;
-
-import com.tinkerpop.gremlin.process.Traversal;
-import com.tinkerpop.gremlin.structure.Edge;
-import com.tinkerpop.gremlin.structure.Vertex;
-
-import java.util.Arrays;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
-
-/**
- * A {@link MessageScope} represents the range of a message. A message can have multiple receivers and message scope
- * allows the underlying {@link GraphComputer} to apply the message passing algorithm in whichever manner is most efficient.
- * It is best to use {@link MessageScope.Local} if possible as that provides more room for optimization by vendors than {@link MessageScope.Global}.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- * @author Matthias Broecheler (me@matthiasb.com)
- */
-public abstract class MessageScope {
-
-    /**
-     * A Global message is directed at an arbitrary vertex in the graph.
-     * The recipient vertex need not be adjacent to the sending vertex.
-     * This message scope should be avoided if a {@link Local} can be used.
-     */
-    public final static class Global extends MessageScope {
-
-        private static final Global INSTANCE = Global.of();
-
-        private final Iterable<Vertex> vertices;
-
-        private Global(final Iterable<Vertex> vertices) {
-            this.vertices = vertices;
-        }
-
-        public static Global of(final Iterable<Vertex> vertices) {
-            return new Global(vertices);
-        }
-
-        public static Global of(final Vertex... vertices) {
-            return new Global(Arrays.asList(vertices));
-        }
-
-        public Iterable<Vertex> vertices() {
-            return this.vertices;
-        }
-
-        public static Global instance() {
-            return INSTANCE;
-        }
-    }
-
-    /**
-     * A Local message is directed to an adjacent (or "memory adjacent") vertex.
-     * The adjacent vertex set is defined by the provided {@link Traversal} that dictates how to go from the sending vertex to the receiving vertex.
-     * This is the preferred message scope as it can potentially be optimized by the underlying {@link Messenger} implementation.
-     * The preferred optimization is to not distribute a message object to all adjacent vertices.
-     * Instead, allow the recipients to read a single message object stored at the "sending" vertex.
-     * This is possible via `Traversal.reverse()`. This optimizations greatly reduces the amount of data created in the computation.
-     *
-     * @param <M> The {@link VertexProgram} message class
-     */
-    public final static class Local<M> extends MessageScope {
-        public final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal;
-        public final BiFunction<M, Edge, M> edgeFunction;
-
-        private Local(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal) {
-            this.incidentTraversal = incidentTraversal;
-            this.edgeFunction = (final M m, final Edge e) -> m; // the default is an identity function
-        }
-
-        private Local(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal, final BiFunction<M, Edge, M> edgeFunction) {
-            this.incidentTraversal = incidentTraversal;
-            this.edgeFunction = edgeFunction;
-        }
-
-        public static <M> Local<M> of(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal) {
-            return new Local(incidentTraversal);
-        }
-
-        public static <M> Local<M> of(final Supplier<? extends Traversal<Vertex, Edge>> incidentTraversal, final BiFunction<M, Edge, M> edgeFunction) {
-            return new Local<>(incidentTraversal, edgeFunction);
-        }
-
-        public BiFunction<M, Edge, M> getEdgeFunction() {
-            return this.edgeFunction;
-        }
-
-        public Supplier<? extends Traversal<Vertex, Edge>> getIncidentTraversal() {
-            return this.incidentTraversal;
-        }
-
-        /**
-         * A helper class that can be used to generate the reverse traversal of the traversal within a {@link MessageScope.Local}.
-         */
-        public static class ReverseTraversalSupplier implements Supplier<Traversal<Vertex, Edge>> {
-
-            private final MessageScope.Local<?> localMessageScope;
-
-            public ReverseTraversalSupplier(final MessageScope.Local<?> localMessageScope) {
-                this.localMessageScope = localMessageScope;
-            }
-
-            public Traversal<Vertex, Edge> get() {
-                return this.localMessageScope.getIncidentTraversal().get().asAdmin().reverse();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Messenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Messenger.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Messenger.java
deleted file mode 100644
index 087b61a..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/Messenger.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer;
-
-/**
- * The {@link Messenger} serves as the routing system for messages between vertices. For distributed systems,
- * the messenger can implement a "message passing" engine (distributed memory). For single machine systems, the
- * messenger can implement a "state sharing" engine (shared memory). Each messenger is tied to the particular
- * vertex distributing the message.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- * @author Matthias Broecheler (me@matthiasb.com)
- */
-public interface Messenger<M> {
-
-    /**
-     * The currently executing vertex can receive the messages of the provided {@link MessageScope}.
-     *
-     * @param messageScope the message scope of the messages to receive
-     * @return the messages for that vertex
-     */
-    public Iterable<M> receiveMessages(final MessageScope messageScope);
-
-    /**
-     * The currently executing vertex can send a message with provided {@link MessageScope}.
-     *
-     * @param messageScope the message scope of the message being sent
-     * @param message      the message to send
-     */
-    public void sendMessage(final MessageScope messageScope, final M message);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/VertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/VertexProgram.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/VertexProgram.java
deleted file mode 100644
index c207465..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/VertexProgram.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.tinkerpop.gremlin.process.computer;
-
-import com.tinkerpop.gremlin.structure.Vertex;
-import org.apache.commons.configuration.Configuration;
-
-import java.lang.reflect.Constructor;
-import java.util.Collections;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * A {@link VertexProgram} represents one component of a distributed graph computation. Each vertex in the graph
- * (logically) executes the {@link VertexProgram} instance in parallel. The collective behavior yields
- * the computational result. In practice, a "worker" (i.e. task, thread, etc.) is responsible for executing the
- * VertexProgram against each vertex that it has in its vertex set (a subset of the full graph vertex set).
- * At minimum there is one "worker" for each vertex, though this is impractical in practice and {@link GraphComputer}
- * implementations that leverage such a design are not expected to perform well due to the excess object creation.
- * Any local state/fields in a VertexProgram is static to the vertices within the same worker set.
- * It is not safe to assume that the VertexProgram's "worker" state will remain stable between iterations.
- * Hence, the existence of {@link VertexProgram#workerIterationStart} and {@link VertexProgram#workerIterationEnd}.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- * @author Matthias Broecheler (me@matthiasb.com)
- */
-public interface VertexProgram<M> extends Cloneable {
-
-    public static final String VERTEX_PROGRAM = "gremlin.vertexProgram";
-
-    /**
-     * When it is necessary to store the state of the VertexProgram, this method is called.
-     * This is typically required when the VertexProgram needs to be serialized to another machine.
-     * Note that what is stored is simply the instance/configuration state, not any processed data.
-     * The default implementation provided simply stores the VertexProgram class name for reflective reconstruction.
-     * It is typically a good idea to VertexProgram.super.storeState().
-     *
-     * @param configuration the configuration to store the state of the VertexProgram in.
-     */
-    public default void storeState(final Configuration configuration) {
-        configuration.setProperty(VERTEX_PROGRAM, this.getClass().getName());
-    }
-
-    /**
-     * When it is necessary to load the state of the VertexProgram, this method is called.
-     * This is typically required when the VertexProgram needs to be serialized to another machine.
-     * Note that what is loaded is simply the instance state, not any processed data.
-     * It is possible for this method to be called for each and every vertex in the graph.
-     * Thus, it is important to only store/load that state which is needed during execution in order to reduce startup time.
-     *
-     * @param configuration the configuration to load the state of the VertexProgram from.
-     */
-    public default void loadState(final Configuration configuration) {
-
-    }
-
-    /**
-     * The method is called at the beginning of the computation.
-     * The method is global to the {@link GraphComputer} and as such, is not called for each vertex.
-     * During this stage, the {@link Memory} should be initialized to to its "start state."
-     *
-     * @param memory The global memory of the GraphComputer
-     */
-    public void setup(final Memory memory);
-
-    /**
-     * This method denotes the main body of the computation and is executed on each vertex in the graph.
-     * This method is logically executed in parallel on all vertices in the graph.
-     * When the {@link Memory} is read, it is according to the aggregated state yielded in the previous iteration.
-     * When the {@link Memory} is written, the data will be aggregated at the end of the iteration for reading in the next iteration.
-     *
-     * @param vertex    the {@link Vertex} to execute the {@link VertexProgram} on
-     * @param messenger the messenger that moves data between vertices
-     * @param memory    the shared state between all vertices in the computation
-     */
-    public void execute(final Vertex vertex, final Messenger<M> messenger, final Memory memory);
-
-    /**
-     * The method is called at the end of each iteration to determine if the computation is complete.
-     * The method is global to the {@link GraphComputer} and as such, is not called for each {@link Vertex}.
-     * The {@link Memory} maintains the aggregated data from the last execute() iteration.
-     *
-     * @param memory The global memory of the {@link GraphComputer}
-     * @return whether or not to halt the computation
-     */
-    public boolean terminate(final Memory memory);
-
-    /**
-     * This method is called at the start of each iteration of each "computational chunk."
-     * The set of vertices in the graph are typically not processed with full parallelism.
-     * The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
-     * The typical use is to create static VertexProgram state that exists for the iteration of the vertex subset.
-     * The default implementation is a no-op.
-     *
-     * @param memory The memory at the start of the iteration.
-     */
-    public default void workerIterationStart(final Memory memory) {
-
-    }
-
-    /**
-     * This method is called at the end of each iteration of each "computational chunk."
-     * The set of vertices in the graph are typically not processed with full parallelism.
-     * The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
-     * The typical use is to destroy static VertexProgram state that existed during the iteration of the vertex subset.
-     * The default implementation is a no-op.
-     *
-     * @param memory The memory at the start of the iteration.
-     */
-    public default void workerIterationEnd(final Memory memory) {
-
-    }
-
-    /**
-     * The {@link com.tinkerpop.gremlin.structure.Element} properties that will be mutated during the computation.
-     * All properties in the graph are readable, but only the keys specified here are writable.
-     * The default is an empty set.
-     *
-     * @return the set of element keys that will be mutated during the vertex program's execution
-     */
-    public default Set<String> getElementComputeKeys() {
-        return Collections.emptySet();
-    }
-
-    /**
-     * The {@link Memory} keys that will be used during the computation.
-     * These are the only keys that can be read or written throughout the life of the {@link GraphComputer}.
-     * The default is an empty set.
-     *
-     * @return the set of memory keys that will be read/written
-     */
-    public default Set<String> getMemoryComputeKeys() {
-        return Collections.emptySet();
-    }
-
-    /**
-     * Combine the messages in route to a particular vertex. Useful to reduce the amount of data transmitted over the wire.
-     * For example, instead of sending two objects that will ultimately be merged at the vertex destination, merge/combine into one and send that object.
-     * If no message combiner is provider, then no messages will be combined.
-     * Furthermore, it is not guaranteed the all messages in route to the vertex will be combined and thus, combiner-state should not be used.
-     * The result of the vertex program algorithm should be the same regardless of whether message combining is executed or not.
-     *
-     * @return A optional denoting whether or not their is a message combine associated with the vertex program.
-     */
-    public default Optional<MessageCombiner<M>> getMessageCombiner() {
-        return Optional.empty();
-    }
-
-    /**
-     * This method returns all the {@link MessageScope} possibilities for a particular iteration of the vertex program.
-     * The returned messages scopes are the scopes that will be used to send messages during the stated iteration.
-     * It is not a requirement that all stated messages scopes be used, just that it is possible that they be used during the iteration.
-     *
-     * @param memory an immutable form of the {@link Memory}
-     * @return all possible message scopes during said vertex program iteration
-     */
-    public Set<MessageScope> getMessageScopes(final Memory memory);
-
-    /**
-     * The set of {@link MapReduce} jobs that are associated with the {@link VertexProgram}.
-     * This is not necessarily the exhaustive list over the life of the {@link GraphComputer}.
-     * If MapReduce jobs are declared by GraphComputer.mapReduce(), they are not contained in this set.
-     * The default is an empty set.
-     *
-     * @return the set of {@link MapReduce} jobs associated with this {@link VertexProgram}
-     */
-    public default Set<MapReduce> getMapReducers() {
-        return Collections.emptySet();
-    }
-
-    /**
-     * When multiple workers on a single machine need VertexProgram instances, it is possible to use clone.
-     * This will provide a speedier way of generating instances, over the {@link VertexProgram#storeState} and {@link VertexProgram#loadState} model.
-     * The default implementation simply returns the object as it assumes that the VertexProgram instance is a stateless singleton.
-     *
-     * @return A clone of the VertexProgram object
-     * @throws CloneNotSupportedException
-     */
-    public VertexProgram<M> clone() throws CloneNotSupportedException;
-
-    /**
-     * A helper method to construct a {@link VertexProgram} given the content of the supplied configuration.
-     * The class of the VertexProgram is read from the {@link VertexProgram#VERTEX_PROGRAM} static configuration key.
-     * Once the VertexProgram is constructed, {@link VertexProgram#loadState} method is called with the provided configuration.
-     *
-     * @param configuration A configuration with requisite information to build a vertex program
-     * @param <V>           The vertex program type
-     * @return the newly constructed vertex program
-     */
-    public static <V extends VertexProgram> V createVertexProgram(final Configuration configuration) {
-        try {
-            final Class<V> vertexProgramClass = (Class) Class.forName(configuration.getString(VERTEX_PROGRAM));
-            final Constructor<V> constructor = vertexProgramClass.getDeclaredConstructor();
-            constructor.setAccessible(true);
-            final V vertexProgram = constructor.newInstance();
-            vertexProgram.loadState(configuration);
-            return vertexProgram;
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    public interface Builder {
-
-        public Builder configure(final Object... keyValues);
-
-        public <P extends VertexProgram> P create();
-
-    }
-
-    public default Features getFeatures() {
-        return new Features() {
-        };
-    }
-
-    public interface Features {
-        public default boolean requiresGlobalMessageScopes() {
-            return false;
-        }
-
-        public default boolean requiresLocalMessageScopes() {
-            return false;
-        }
-
-        public default boolean requiresVertexAddition() {
-            return false;
-        }
-
-        public default boolean requiresVertexRemoval() {
-            return false;
-        }
-
-        public default boolean requiresVertexPropertyAddition() {
-            return false;
-        }
-
-        public default boolean requiresVertexPropertyRemoval() {
-            return false;
-        }
-
-        public default boolean requiresEdgeAddition() {
-            return false;
-        }
-
-        public default boolean requiresEdgeRemoval() {
-            return false;
-        }
-
-        public default boolean requiresEdgePropertyAddition() {
-            return false;
-        }
-
-        public default boolean requiresEdgePropertyRemoval() {
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
deleted file mode 100644
index 8d39f64..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
-
-import com.tinkerpop.gremlin.process.computer.KeyValue;
-import com.tinkerpop.gremlin.process.computer.MapReduce;
-import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import com.tinkerpop.gremlin.structure.Property;
-import com.tinkerpop.gremlin.structure.Vertex;
-import com.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.commons.configuration.Configuration;
-
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class ClusterCountMapReduce extends StaticMapReduce<MapReduce.NullObject, Serializable, MapReduce.NullObject, Integer, Integer> {
-
-    public static final String CLUSTER_COUNT_MEMORY_KEY = "gremlin.clusterCountMapReduce.memoryKey";
-    public static final String DEFAULT_MEMORY_KEY = "clusterCount";
-
-    private String memoryKey = DEFAULT_MEMORY_KEY;
-
-
-    private ClusterCountMapReduce() {
-
-    }
-
-    private ClusterCountMapReduce(final String memoryKey) {
-        this.memoryKey = memoryKey;
-    }
-
-    @Override
-    public void storeState(final Configuration configuration) {
-        super.storeState(configuration);
-        configuration.setProperty(CLUSTER_COUNT_MEMORY_KEY, this.memoryKey);
-    }
-
-    @Override
-    public void loadState(final Configuration configuration) {
-        this.memoryKey = configuration.getString(CLUSTER_COUNT_MEMORY_KEY, DEFAULT_MEMORY_KEY);
-    }
-
-    @Override
-    public boolean doStage(final Stage stage) {
-        return true;
-    }
-
-    @Override
-    public void map(final Vertex vertex, final MapEmitter<NullObject, Serializable> emitter) {
-        final Property<Serializable> cluster = vertex.property(PeerPressureVertexProgram.CLUSTER);
-        if (cluster.isPresent()) {
-            emitter.emit(NullObject.instance(), cluster.value());
-        }
-    }
-
-    @Override
-    public void combine(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
-        this.reduce(key, values, emitter);
-    }
-
-    @Override
-    public void reduce(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
-        final Set<Serializable> set = new HashSet<>();
-        values.forEachRemaining(set::add);
-        emitter.emit(NullObject.instance(), set.size());
-
-    }
-
-    @Override
-    public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
-        return keyValues.next().getValue();
-    }
-
-    @Override
-    public String getMemoryKey() {
-        return this.memoryKey;
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.mapReduceString(this, this.memoryKey);
-    }
-
-    //////////////////////////////
-
-    public static Builder build() {
-        return new Builder();
-    }
-
-    public static class Builder {
-
-        private String memoryKey = DEFAULT_MEMORY_KEY;
-
-        private Builder() {
-
-        }
-
-        public Builder memoryKey(final String memoryKey) {
-            this.memoryKey = memoryKey;
-            return this;
-        }
-
-        public ClusterCountMapReduce create() {
-            return new ClusterCountMapReduce(this.memoryKey);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
deleted file mode 100644
index a8a400d..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterPopulationMapReduce.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
-
-import com.tinkerpop.gremlin.process.computer.KeyValue;
-import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import com.tinkerpop.gremlin.structure.Property;
-import com.tinkerpop.gremlin.structure.Vertex;
-import com.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.commons.configuration.Configuration;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class ClusterPopulationMapReduce extends StaticMapReduce<Serializable, Long, Serializable, Long, Map<Serializable, Long>> {
-
-    public static final String CLUSTER_POPULATION_MEMORY_KEY = "gremlin.clusterPopulationMapReduce.memoryKey";
-    public static final String DEFAULT_MEMORY_KEY = "clusterPopulation";
-
-    private String memoryKey = DEFAULT_MEMORY_KEY;
-
-    private ClusterPopulationMapReduce() {
-    }
-
-    private ClusterPopulationMapReduce(final String memoryKey) {
-        this.memoryKey = memoryKey;
-    }
-
-    @Override
-    public void storeState(final Configuration configuration) {
-        super.storeState(configuration);
-        configuration.setProperty(CLUSTER_POPULATION_MEMORY_KEY, this.memoryKey);
-    }
-
-    @Override
-    public void loadState(final Configuration configuration) {
-        this.memoryKey = configuration.getString(CLUSTER_POPULATION_MEMORY_KEY, DEFAULT_MEMORY_KEY);
-    }
-
-    @Override
-    public boolean doStage(final Stage stage) {
-        return true;
-    }
-
-    @Override
-    public void map(final Vertex vertex, final MapEmitter<Serializable, Long> emitter) {
-        final Property<Serializable> cluster = vertex.property(PeerPressureVertexProgram.CLUSTER);
-        if (cluster.isPresent()) {
-            emitter.emit(cluster.value(), 1l);
-        }
-    }
-
-    @Override
-    public void combine(final Serializable key, final Iterator<Long> values, final ReduceEmitter<Serializable, Long> emitter) {
-        this.reduce(key, values, emitter);
-    }
-
-    @Override
-    public void reduce(final Serializable key, final Iterator<Long> values, final ReduceEmitter<Serializable, Long> emitter) {
-        long count = 0l;
-        while (values.hasNext()) {
-            count = count + values.next();
-        }
-        emitter.emit(key, count);
-    }
-
-    @Override
-    public Map<Serializable, Long> generateFinalResult(final Iterator<KeyValue<Serializable, Long>> keyValues) {
-        final Map<Serializable, Long> clusterPopulation = new HashMap<>();
-        keyValues.forEachRemaining(pair -> clusterPopulation.put(pair.getKey(), pair.getValue()));
-        return clusterPopulation;
-    }
-
-    @Override
-    public String getMemoryKey() {
-        return this.memoryKey;
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.mapReduceString(this, this.memoryKey);
-    }
-
-    //////////////////////////////
-
-    public static Builder build() {
-        return new Builder();
-    }
-
-    public static class Builder {
-
-        private String memoryKey = DEFAULT_MEMORY_KEY;
-
-        private Builder() {
-
-        }
-
-        public Builder memoryKey(final String memoryKey) {
-            this.memoryKey = memoryKey;
-            return this;
-        }
-
-        public ClusterPopulationMapReduce create() {
-            return new ClusterPopulationMapReduce(this.memoryKey);
-        }
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
deleted file mode 100644
index 98fafcc..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer.clustering.peerpressure;
-
-import com.tinkerpop.gremlin.process.Traversal;
-import com.tinkerpop.gremlin.process.computer.Memory;
-import com.tinkerpop.gremlin.process.computer.MessageScope;
-import com.tinkerpop.gremlin.process.computer.Messenger;
-import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
-import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
-import com.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
-import com.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
-import com.tinkerpop.gremlin.process.graph.traversal.__;
-import com.tinkerpop.gremlin.process.util.MapHelper;
-import com.tinkerpop.gremlin.structure.Edge;
-import com.tinkerpop.gremlin.structure.Vertex;
-import com.tinkerpop.gremlin.structure.VertexProperty;
-import com.tinkerpop.gremlin.structure.util.StringFactory;
-import com.tinkerpop.gremlin.util.StreamFactory;
-import org.apache.commons.configuration.Configuration;
-import org.javatuples.Pair;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Supplier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializable, Double>> {
-
-    private MessageScope.Local<?> voteScope = MessageScope.Local.of(__::outE);
-    private MessageScope.Local<?> countScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.voteScope));
-    private final Set<MessageScope> VOTE_SCOPE = new HashSet<>(Collections.singletonList(this.voteScope));
-    private final Set<MessageScope> COUNT_SCOPE = new HashSet<>(Collections.singletonList(this.countScope));
-
-    public static final String CLUSTER = "gremlin.peerPressureVertexProgram.cluster";
-    public static final String VOTE_STRENGTH = "gremlin.peerPressureVertexProgram.voteStrength";
-
-    private static final String MAX_ITERATIONS = "gremlin.peerPressureVertexProgram.maxIterations";
-    private static final String DISTRIBUTE_VOTE = "gremlin.peerPressureVertexProgram.distributeVote";
-    private static final String INCIDENT_TRAVERSAL_SUPPLIER = "gremlin.peerPressureVertexProgram.incidentTraversalSupplier";
-    private static final String VOTE_TO_HALT = "gremlin.peerPressureVertexProgram.voteToHalt";
-
-    private LambdaHolder<Supplier<Traversal<Vertex, Edge>>> traversalSupplier;
-    private int maxIterations = 30;
-    private boolean distributeVote = false;
-
-    private static final Set<String> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(CLUSTER, VOTE_STRENGTH));
-    private static final Set<String> MEMORY_COMPUTE_KEYS = new HashSet<>(Collections.singletonList(VOTE_TO_HALT));
-
-    private PeerPressureVertexProgram() {
-
-    }
-
-    @Override
-    public void loadState(final Configuration configuration) {
-        this.traversalSupplier = LambdaHolder.loadState(configuration, INCIDENT_TRAVERSAL_SUPPLIER);
-        if (null != this.traversalSupplier) {
-            VertexProgramHelper.verifyReversibility(this.traversalSupplier.get().get().asAdmin());
-            this.voteScope = MessageScope.Local.of(this.traversalSupplier.get());
-            this.countScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.voteScope));
-        }
-        this.maxIterations = configuration.getInt(MAX_ITERATIONS, 30);
-        this.distributeVote = configuration.getBoolean(DISTRIBUTE_VOTE, false);
-    }
-
-    @Override
-    public void storeState(final Configuration configuration) {
-        super.storeState(configuration);
-        this.traversalSupplier.storeState(configuration);
-        configuration.setProperty(MAX_ITERATIONS, this.maxIterations);
-        configuration.setProperty(DISTRIBUTE_VOTE, this.distributeVote);
-
-    }
-
-    @Override
-    public Set<String> getElementComputeKeys() {
-        return ELEMENT_COMPUTE_KEYS;
-    }
-
-    @Override
-    public Set<String> getMemoryComputeKeys() {
-        return MEMORY_COMPUTE_KEYS;
-    }
-
-    @Override
-    public Set<MessageScope> getMessageScopes(final Memory memory) {
-        return this.distributeVote && memory.isInitialIteration() ? COUNT_SCOPE : VOTE_SCOPE;
-    }
-
-    @Override
-    public void setup(final Memory memory) {
-        memory.set(VOTE_TO_HALT, false);
-    }
-
-    @Override
-    public void execute(final Vertex vertex, Messenger<Pair<Serializable, Double>> messenger, final Memory memory) {
-        if (memory.isInitialIteration()) {
-            if (this.distributeVote) {
-                messenger.sendMessage(this.countScope, Pair.with('c', 1.0d));
-            } else {
-                double voteStrength = 1.0d;
-                vertex.property(VertexProperty.Cardinality.single, CLUSTER, vertex.id());
-                vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
-                messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
-                memory.and(VOTE_TO_HALT, false);
-            }
-        } else if (1 == memory.getIteration() && this.distributeVote) {
-            double voteStrength = 1.0d / StreamFactory.stream(messenger.receiveMessages(this.countScope)).map(Pair::getValue1).reduce(0.0d, (a, b) -> a + b);
-            vertex.property(VertexProperty.Cardinality.single, CLUSTER, vertex.id());
-            vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
-            messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
-            memory.and(VOTE_TO_HALT, false);
-        } else {
-            final Map<Serializable, Double> votes = new HashMap<>();
-            votes.put(vertex.value(CLUSTER), vertex.<Double>value(VOTE_STRENGTH));
-            messenger.receiveMessages(this.voteScope).forEach(message -> MapHelper.incr(votes, message.getValue0(), message.getValue1()));
-            Serializable cluster = PeerPressureVertexProgram.largestCount(votes);
-            if (null == cluster) cluster = (Serializable) vertex.id();
-            memory.and(VOTE_TO_HALT, vertex.value(CLUSTER).equals(cluster));
-            vertex.property(VertexProperty.Cardinality.single, CLUSTER, cluster);
-            messenger.sendMessage(this.voteScope, new Pair<>(cluster, vertex.<Double>value(VOTE_STRENGTH)));
-        }
-    }
-
-    @Override
-    public boolean terminate(final Memory memory) {
-        final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT) || memory.getIteration() >= (this.distributeVote ? this.maxIterations + 1 : this.maxIterations);
-        if (voteToHalt) {
-            return true;
-        } else {
-            memory.or(VOTE_TO_HALT, true);
-            return false;
-        }
-    }
-
-    private static <T> T largestCount(final Map<T, Double> map) {
-        T largestKey = null;
-        double largestValue = Double.MIN_VALUE;
-        for (Map.Entry<T, Double> entry : map.entrySet()) {
-            if (entry.getValue() == largestValue) {
-                if (null != largestKey && largestKey.toString().compareTo(entry.getKey().toString()) > 0) {
-                    largestKey = entry.getKey();
-                    largestValue = entry.getValue();
-                }
-            } else if (entry.getValue() > largestValue) {
-                largestKey = entry.getKey();
-                largestValue = entry.getValue();
-            }
-        }
-        return largestKey;
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.vertexProgramString(this, "distributeVote=" + this.distributeVote + ",maxIterations=" + this.maxIterations);
-    }
-
-    //////////////////////////////
-
-    public static Builder build() {
-        return new Builder();
-    }
-
-    public static class Builder extends AbstractVertexProgramBuilder<Builder> {
-
-
-        private Builder() {
-            super(PeerPressureVertexProgram.class);
-        }
-
-        public Builder maxIterations(final int iterations) {
-            this.configuration.setProperty(MAX_ITERATIONS, iterations);
-            return this;
-        }
-
-        public Builder distributeVote(final boolean distributeVote) {
-            this.configuration.setProperty(DISTRIBUTE_VOTE, distributeVote);
-            return this;
-        }
-
-        public Builder incident(final String scriptEngine, final String traversalScript) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, INCIDENT_TRAVERSAL_SUPPLIER, new String[]{scriptEngine, traversalScript});
-            return this;
-        }
-
-        public Builder incident(final String traversalScript) {
-            return incident(GREMLIN_GROOVY, traversalScript);
-        }
-
-        public Builder incident(final Supplier<Traversal<Vertex, Edge>> traversal) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, INCIDENT_TRAVERSAL_SUPPLIER, traversal);
-            return this;
-        }
-
-        public Builder incident(final Class<Supplier<Traversal<Vertex, Edge>>> traversalClass) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, INCIDENT_TRAVERSAL_SUPPLIER, traversalClass);
-            return this;
-        }
-
-    }
-
-    ////////////////////////////
-
-    @Override
-    public Features getFeatures() {
-        return new Features() {
-            @Override
-            public boolean requiresLocalMessageScopes() {
-                return true;
-            }
-
-            @Override
-            public boolean requiresVertexPropertyAddition() {
-                return true;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java b/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
deleted file mode 100644
index 13caa2f..0000000
--- a/gremlin-core/src/main/java/com/tinkerpop/gremlin/process/computer/lambda/LambdaMapReduce.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.process.computer.lambda;
-
-import com.tinkerpop.gremlin.process.computer.KeyValue;
-import com.tinkerpop.gremlin.process.computer.MapReduce;
-import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
-import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
-import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import com.tinkerpop.gremlin.structure.Vertex;
-import com.tinkerpop.gremlin.structure.util.StringFactory;
-import com.tinkerpop.gremlin.util.function.TriConsumer;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Optional;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class LambdaMapReduce<MK, MV, RK, RV, R> extends StaticMapReduce<MK, MV, RK, RV, R> {
-
-    public static final String MAP_LAMBDA = "gremlin.lambdaMapReduce.mapLambda";
-    public static final String MAP_KEY_SORT = "gremlin.lambdaMapReduce.mapKeySort";
-    public static final String COMBINE_LAMBDA = "gremlin.lambdaMapReduce.combineLambda";
-    public static final String REDUCE_LAMBDA = "gremlin.lambdaMapReduce.reduceLambda";
-    public static final String REDUCE_KEY_SORT = "gremlin.lambdaMapReduce.reduceKeySort";
-    public static final String MEMORY_LAMBDA = "gremlin.lambdaMapReduce.memoryLambda";
-    public static final String MEMORY_KEY = "gremlin.lambdaMapReduce.memoryKey";
-
-    private LambdaHolder<BiConsumer<Vertex, MapEmitter<MK, MV>>> mapLambdaHolder;
-    private LambdaHolder<Supplier<Comparator<MK>>> mapKeySortLambdaHolder;
-    private LambdaHolder<TriConsumer<MK, Iterator<MV>, ReduceEmitter<RK, RV>>> combineLambdaHolder;
-    private LambdaHolder<TriConsumer<MK, Iterator<MV>, ReduceEmitter<RK, RV>>> reduceLambdaHolder;
-    private LambdaHolder<Supplier<Comparator<RK>>> reduceKeySortLambdaHolder;
-    private LambdaHolder<Function<Iterator<KeyValue<RK, RV>>, R>> memoryLambdaHolder;
-    private String memoryKey;
-
-    private LambdaMapReduce() {
-
-    }
-
-    @Override
-    public void loadState(final Configuration configuration) {
-        this.mapLambdaHolder = LambdaHolder.loadState(configuration, MAP_LAMBDA);
-        this.mapKeySortLambdaHolder = LambdaHolder.loadState(configuration, MAP_KEY_SORT);
-        this.combineLambdaHolder = LambdaHolder.loadState(configuration, COMBINE_LAMBDA);
-        this.reduceLambdaHolder = LambdaHolder.loadState(configuration, REDUCE_LAMBDA);
-        this.reduceKeySortLambdaHolder = LambdaHolder.loadState(configuration, REDUCE_KEY_SORT);
-        this.memoryLambdaHolder = LambdaHolder.loadState(configuration, MEMORY_LAMBDA);
-        this.memoryKey = configuration.getString(MEMORY_KEY, null);
-    }
-
-    @Override
-    public void storeState(final Configuration configuration) {
-        super.storeState(configuration);
-        if (null != this.mapLambdaHolder)
-            this.mapLambdaHolder.storeState(configuration);
-        if (null != this.mapKeySortLambdaHolder)
-            this.mapKeySortLambdaHolder.storeState(configuration);
-        if (null != this.combineLambdaHolder)
-            this.combineLambdaHolder.storeState(configuration);
-        if (null != this.reduceLambdaHolder)
-            this.reduceLambdaHolder.storeState(configuration);
-        if (null != this.reduceKeySortLambdaHolder)
-            this.reduceKeySortLambdaHolder.storeState(configuration);
-        if (null != this.memoryLambdaHolder)
-            this.memoryLambdaHolder.storeState(configuration);
-        configuration.setProperty(MEMORY_KEY, this.memoryKey);
-    }
-
-    @Override
-    public boolean doStage(final Stage stage) {
-        if (stage.equals(Stage.MAP))
-            return null != this.mapLambdaHolder;
-        else if (stage.equals(Stage.COMBINE))
-            return null != this.combineLambdaHolder;
-        else
-            return null != this.reduceLambdaHolder;
-    }
-
-    @Override
-    public void map(final Vertex vertex, final MapEmitter<MK, MV> emitter) {
-        this.mapLambdaHolder.get().accept(vertex, emitter);
-    }
-
-    @Override
-    public void combine(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
-        this.combineLambdaHolder.get().accept(key, values, emitter);
-    }
-
-    @Override
-    public void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
-        this.reduceLambdaHolder.get().accept(key, values, emitter);
-    }
-
-    @Override
-    public Optional<Comparator<MK>> getMapKeySort() {
-        return null == this.mapKeySortLambdaHolder ? Optional.empty() : Optional.of(this.mapKeySortLambdaHolder.get().get());
-    }
-
-    @Override
-    public Optional<Comparator<RK>> getReduceKeySort() {
-        return null == this.reduceKeySortLambdaHolder ? Optional.empty() : Optional.of(this.reduceKeySortLambdaHolder.get().get());
-    }
-
-    @Override
-    public R generateFinalResult(final Iterator<KeyValue<RK, RV>> keyValues) {
-        return null == this.memoryLambdaHolder ? (R) keyValues : this.memoryLambdaHolder.get().apply(keyValues);
-    }
-
-    @Override
-    public String getMemoryKey() {
-        return this.memoryKey;
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.mapReduceString(this, this.memoryKey);
-    }
-
-    //////////////////
-
-    public static <MK, MV, RK, RV, R> Builder<MK, MV, RK, RV, R> build() {
-        return new Builder<>();
-    }
-
-    public static class Builder<MK, MV, RK, RV, R> {
-
-        private final Configuration configuration = new BaseConfiguration();
-
-        public Builder<MK, MV, RK, RV, R> map(final BiConsumer<Vertex, MapReduce.MapEmitter<MK, MV>> mapLambda) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MAP_LAMBDA, mapLambda);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> map(final Class<? extends BiConsumer<Vertex, MapReduce.MapEmitter<MK, MV>>> mapClass) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_LAMBDA, mapClass);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> map(final String scriptEngine, final String mapScript) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_LAMBDA, new String[]{scriptEngine, mapScript});
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> map(final String setupScript) {
-            return map(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
-        }
-
-        //
-
-        public Builder<MK, MV, RK, RV, R> mapKeySort(final Comparator<MK> comparator) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MAP_KEY_SORT, comparator);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> mapKeySort(final Class<? extends Comparator<MK>> comparatorClass) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, MAP_KEY_SORT, comparatorClass);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> mapKeySort(final String scriptEngine, final String reduceScript) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MAP_KEY_SORT, new String[]{scriptEngine, reduceScript});
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> mapKeySort(final String setupScript) {
-            return mapKeySort(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
-        }
-
-        ////////////
-
-        public Builder<MK, MV, RK, RV, R> combine(TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>> combineLambda) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, COMBINE_LAMBDA, combineLambda);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> combine(final Class<? extends TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>>> combineClass) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, COMBINE_LAMBDA, combineClass);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> combine(final String scriptEngine, final String combineScript) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, COMBINE_LAMBDA, new String[]{scriptEngine, combineScript});
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> combine(final String setupScript) {
-            return combine(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
-        }
-
-        ////////////
-
-        public Builder<MK, MV, RK, RV, R> reduce(TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>> reduceLambda) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, REDUCE_LAMBDA, reduceLambda);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> reduce(Class<? extends TriConsumer<MK, Iterator<MV>, MapReduce.ReduceEmitter<RK, RV>>> reduceClass) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, REDUCE_LAMBDA, reduceClass);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> reduce(final String scriptEngine, final String reduceScript) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, REDUCE_LAMBDA, new String[]{scriptEngine, reduceScript});
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> reduce(final String setupScript) {
-            return reduce(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
-        }
-
-        //
-
-        public Builder<MK, MV, RK, RV, R> reduceKeySort(final Supplier<Comparator<RK>> comparator) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, REDUCE_KEY_SORT, comparator);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> reduceKeySort(final Class<? extends Supplier<Comparator<RK>>> comparatorClass) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, REDUCE_KEY_SORT, comparatorClass);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> reduceKeySort(final String scriptEngine, final String reduceScript) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, REDUCE_KEY_SORT, new String[]{scriptEngine, reduceScript});
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> reduceKeySort(final String setupScript) {
-            return reduceKeySort(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
-        }
-
-        ////////////
-
-        public Builder<MK, MV, RK, RV, R> memory(Function<Iterator<KeyValue<RK, RV>>, R> memoryLambda) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, MEMORY_LAMBDA, memoryLambda);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> memory(Class<? extends Function<Iterator<KeyValue<RK, RV>>, R>> memoryClass) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, MEMORY_LAMBDA, memoryClass);
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> memory(final String scriptEngine, final String memoryScript) {
-            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, MEMORY_LAMBDA, new String[]{scriptEngine, memoryScript});
-            return this;
-        }
-
-        public Builder<MK, MV, RK, RV, R> memory(final String setupScript) {
-            return memory(AbstractVertexProgramBuilder.GREMLIN_GROOVY, setupScript);
-        }
-
-        ////////////
-
-        public Builder<MK, MV, RK, RV, R> memoryKey(final String memoryKey) {
-            this.configuration.setProperty(LambdaMapReduce.MEMORY_KEY, memoryKey);
-            return this;
-        }
-
-        public LambdaMapReduce<MK, MV, RK, RV, R> create() {
-            LambdaMapReduce<MK, MV, RK, RV, R> lambdaMapReduce = new LambdaMapReduce<>();
-            lambdaMapReduce.loadState(this.configuration);
-            return lambdaMapReduce;
-        }
-    }
-}