You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:01:50 UTC

[24/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop directories to org/apache/tinkerpop

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java
new file mode 100644
index 0000000..870c155
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traversal.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import com.tinkerpop.gremlin.process.computer.ComputerResult;
+import com.tinkerpop.gremlin.process.computer.GraphComputer;
+import com.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import com.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep;
+import com.tinkerpop.gremlin.process.graph.traversal.GraphTraversal;
+import com.tinkerpop.gremlin.process.traversal.DefaultTraversal;
+import com.tinkerpop.gremlin.process.traversal.step.EmptyStep;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.process.util.BulkSet;
+import com.tinkerpop.gremlin.structure.Graph;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link Traversal} represents a directed walk over a {@link Graph}.
+ * This is the base interface for all traversal's, where each extending interface is seen as a domain specific language.
+ * For example, {@link GraphTraversal} is a domain specific language for traversing a graph using "graph concepts" (e.g. vertices, edges).
+ * Another example may represent the graph using "social concepts" (e.g. people, cities, artifacts).
+ * A {@link Traversal} is evaluated in one of two ways: {@link TraversalEngine#STANDARD} (OLTP) and {@link TraversalEngine#COMPUTER} (OLAP).
+ * OLTP traversals leverage an iterator and are executed within a single JVM (with data access allowed to be remote).
+ * OLAP traversals leverage {@link GraphComputer} and are executed between multiple JVMs (and/or cores).
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable {
+
+    /**
+     * Used for reflection based access to the static "of" method of a Traversal.
+     */
+    public static final String OF = "of";
+
+    /**
+     * Get access to administrative methods of the traversal via its accompanying {@link Traversal.Admin}.
+     *
+     * @return the admin of this traversal
+     */
+    public default Traversal.Admin<S, E> asAdmin() {
+        return (Traversal.Admin<S, E>) this;
+    }
+
+    /**
+     * Submit the traversal to a {@link GraphComputer} for OLAP execution.
+     * This method should execute the traversal via {@link TraversalVertexProgram}.
+     * It should then wrap the {@link com.tinkerpop.gremlin.process.computer.util.DefaultComputerResult} in a new {@link Traversal} containing a {@link ComputerResultStep}.
+     *
+     * @param computer the GraphComputer to execute the traversal on
+     * @return a new traversal with the starts being the results of the TraversalVertexProgram
+     */
+    public default Traversal<S, E> submit(final GraphComputer computer) {
+        try {
+            final TraversalVertexProgram vertexProgram = TraversalVertexProgram.build().traversal(this.asAdmin()).create();
+            final ComputerResult result = computer.program(vertexProgram).submit().get();
+            final Traversal.Admin<S, S> traversal = new DefaultTraversal<>(result.graph().getClass());
+            return traversal.asAdmin().addStep(new ComputerResultStep<>(traversal, result, vertexProgram, true));
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Return an {@link Optional} of the next E object in the traversal.
+     * If the traversal is empty, then an {@link Optional#empty()} is returned.
+     *
+     * @return an optional of the next object in the traversal
+     */
+    public default Optional<E> tryNext() {
+        return this.hasNext() ? Optional.of(this.next()) : Optional.empty();
+    }
+
+    /**
+     * Get the next n-number of results from the traversal.
+     * If the traversal has less than n-results, then only that number of results are returned.
+     *
+     * @param amount the number of results to get
+     * @return the n-results in a {@link List}
+     */
+    public default List<E> next(final int amount) {
+        final List<E> result = new ArrayList<>();
+        int counter = 0;
+        while (counter++ < amount && this.hasNext()) {
+            result.add(this.next());
+        }
+        return result;
+    }
+
+    /**
+     * Put all the results into an {@link ArrayList}.
+     *
+     * @return the results in a list
+     */
+    public default List<E> toList() {
+        return this.fill(new ArrayList<>());
+    }
+
+    /**
+     * Put all the results into a {@link HashSet}.
+     *
+     * @return the results in a set
+     */
+    public default Set<E> toSet() {
+        return this.fill(new HashSet<>());
+    }
+
+    /**
+     * Put all the results into a {@link BulkSet}.
+     * This can reduce both time and space when aggregating results by ensuring a weighted set.
+     *
+     * @return the results in a bulk set
+     */
+    public default BulkSet<E> toBulkSet() {
+        return this.fill(new BulkSet<>());
+    }
+
+    /**
+     * Add all the results of the traversal to the provided collection.
+     *
+     * @param collection the collection to fill
+     * @return the collection now filled
+     */
+    public default <C extends Collection<E>> C fill(final C collection) {
+        try {
+            if (!this.asAdmin().getEngine().isPresent())
+                this.asAdmin().applyStrategies(TraversalEngine.STANDARD);
+            // use the end step so the results are bulked
+            final Step<?, E> endStep = this.asAdmin().getEndStep();
+            while (true) {
+                final Traverser<E> traverser = endStep.next();
+                TraversalHelper.addToCollection(collection, traverser.get(), traverser.bulk());
+            }
+        } catch (final NoSuchElementException ignored) {
+        }
+        return collection;
+    }
+
+    /**
+     * Iterate all the {@link Traverser} instances in the traversal.
+     * What is returned is the empty traversal.
+     * It is assumed that what is desired from the computation is are the sideEffects yielded by the traversal.
+     *
+     * @return the fully drained traversal
+     */
+    public default <A, B> Traversal<A, B> iterate() {
+        try {
+            if (!this.asAdmin().getEngine().isPresent())
+                this.asAdmin().applyStrategies(TraversalEngine.STANDARD);
+            // use the end step so the results are bulked
+            final Step<?, E> endStep = this.asAdmin().getEndStep();
+            while (true) {
+                endStep.next();
+            }
+        } catch (final NoSuchElementException ignored) {
+        }
+        return (Traversal<A, B>) this;
+    }
+
+    /**
+     * A traversal can be rewritten such that its defined end type E may yield objects of a different type.
+     * This helper method allows for the casting of the output to the known the type.
+     *
+     * @param endType  the true output type of the traversal
+     * @param consumer a {@link Consumer} to process each output
+     * @param <E2>     the known output type of the traversal
+     */
+    public default <E2> void forEachRemaining(final Class<E2> endType, final Consumer<E2> consumer) {
+        try {
+            while (true) {
+                consumer.accept((E2) next());
+            }
+        } catch (final NoSuchElementException ignore) {
+
+        }
+    }
+
+    /**
+     * A collection of {@link Exception} types associated with Traversal execution.
+     */
+    public static class Exceptions {
+
+        public static IllegalStateException traversalIsLocked() {
+            return new IllegalStateException("The traversal strategies are complete and the traversal can no longer be modulated");
+        }
+
+        public static IllegalStateException traversalIsNotReversible() {
+            return new IllegalStateException("The traversal is not reversible as it contains steps that are not reversible");
+        }
+    }
+
+    public interface Admin<S, E> extends Traversal<S, E> {
+
+        /**
+         * Add an iterator of {@link Traverser} objects to the head/start of the traversal.
+         * Users should typically not need to call this method. For dynamic inject of data, they should use {@link com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.InjectStep}.
+         *
+         * @param starts an iterators of traversers
+         */
+        public default void addStarts(final Iterator<Traverser<S>> starts) {
+            if(!this.getEngine().isPresent()) this.applyStrategies(TraversalEngine.STANDARD);
+            this.getStartStep().addStarts(starts);
+        }
+
+        /**
+         * Add a single {@link Traverser} object to the head of the traversal.
+         * Users should typically not need to call this method. For dynamic inject of data, they should use {@link com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.InjectStep}.
+         *
+         * @param start a traverser to add to the traversal
+         */
+        public default void addStart(final Traverser<S> start) {
+            if(!this.getEngine().isPresent()) this.applyStrategies(TraversalEngine.STANDARD);
+            this.getStartStep().addStart(start);
+        }
+
+        /**
+         * Get the {@link Step} instances associated with this traversal.
+         * The steps are ordered according to their linked list structure as defined by {@link Step#getPreviousStep()} and {@link Step#getNextStep()}.
+         *
+         * @return the ordered steps of the traversal
+         */
+        public List<Step> getSteps();
+
+        /**
+         * Add a {@link Step} to the end of the traversal. This method should link the step to its next and previous step accordingly.
+         *
+         * @param step the step to add
+         * @param <E2> the output of the step
+         * @return the updated traversal
+         * @throws IllegalStateException if the {@link TraversalStrategies} have already been applied
+         */
+        public default <E2> Traversal.Admin<S, E2> addStep(final Step<?, E2> step) throws IllegalStateException {
+            return this.addStep(this.getSteps().size(), step);
+        }
+
+        /**
+         * Add a {@link Step} to an arbitrary point in the traversal.
+         *
+         * @param index the location in the traversal to insert the step
+         * @param step  the step to add
+         * @param <S2>  the new start type of the traversal (if the added step was a start step)
+         * @param <E2>  the new end type of the traversal (if the added step was an end step)
+         * @return the newly modulated traversal
+         * @throws IllegalStateException if the {@link TraversalStrategies} have already been applied
+         */
+        public <S2, E2> Traversal.Admin<S2, E2> addStep(final int index, final Step<?, ?> step) throws IllegalStateException;
+
+        /**
+         * Remove a {@link Step} from the traversal.
+         *
+         * @param step the step to remove
+         * @param <S2> the new start type of the traversal (if the removed step was a start step)
+         * @param <E2> the new end type of the traversal (if the removed step was an end step)
+         * @return the newly modulated traversal
+         * @throws IllegalStateException if the {@link TraversalStrategies} have already been applied
+         */
+        public default <S2, E2> Traversal.Admin<S2, E2> removeStep(final Step<?, ?> step) throws IllegalStateException {
+            return this.removeStep(this.getSteps().indexOf(step));
+        }
+
+        /**
+         * Remove a {@link Step} from the traversal.
+         *
+         * @param index the location in the traversal of the step to be evicted
+         * @param <S2>  the new start type of the traversal (if the removed step was a start step)
+         * @param <E2>  the new end type of the traversal (if the removed step was an end step)
+         * @return the newly modulated traversal
+         * @throws IllegalStateException if the {@link TraversalStrategies} have already been applied
+         */
+        public <S2, E2> Traversal.Admin<S2, E2> removeStep(final int index) throws IllegalStateException;
+
+        /**
+         * Get the start/head of the traversal. If the traversal is empty, then an {@link EmptyStep} instance is returned.
+         *
+         * @return the start step of the traversal
+         */
+        public default Step<S, ?> getStartStep() {
+            final List<Step> steps = this.getSteps();
+            return steps.isEmpty() ? EmptyStep.instance() : steps.get(0);
+        }
+
+        /**
+         * Get the end/tail of the traversal. If the traversal is empty, then an {@link EmptyStep} instance is returned.
+         *
+         * @return the end step of the traversal
+         */
+        public default Step<?, E> getEndStep() {
+            final List<Step> steps = this.getSteps();
+            return steps.isEmpty() ? EmptyStep.instance() : steps.get(steps.size() - 1);
+        }
+
+        /**
+         * Apply the registered {@link TraversalStrategies} to the traversal.
+         * Once the strategies are applied, the traversal is "locked" and can no longer have steps added to it.
+         * The order of operations for strategy applications should be: globally id steps, apply strategies to root traversal, then to nested traversals.
+         *
+         * @param engine the engine that will ultimately execute the traversal.
+         * @throws IllegalStateException if the {@link TraversalStrategies} have already been applied
+         */
+        public void applyStrategies(final TraversalEngine engine) throws IllegalStateException;
+
+        /**
+         * When the {@link TraversalStrategies} have been applied, the destined {@link TraversalEngine} has been declared.
+         * Once a traversal engine has been declared, the traversal can no longer be extended, only executed.
+         *
+         * @return whether the traversal engine has been defined or not.
+         */
+        public Optional<TraversalEngine> getEngine();
+
+        /**
+         * Get the {@link TraverserGenerator} associated with this traversal.
+         * The traversal generator creates {@link Traverser} instances that are respective of the traversal's {@link com.tinkerpop.gremlin.process.traverser.TraverserRequirement}.
+         *
+         * @return the generator of traversers
+         */
+        public default TraverserGenerator getTraverserGenerator() {
+            return this.getStrategies().getTraverserGeneratorFactory().getTraverserGenerator(this);
+        }
+
+        /**
+         * Get the set of all {@link TraverserRequirement}s for this traversal.
+         *
+         * @return the features of a traverser that are required to execute properly in this traversal
+         */
+        public default Set<TraverserRequirement> getTraverserRequirements() {
+            final Set<TraverserRequirement> requirements = this.getSteps().stream()
+                    .flatMap(step -> ((Step<?, ?>) step).getRequirements().stream())
+                    .collect(Collectors.toSet());
+            if (this.getSideEffects().keys().size() > 0)
+                requirements.add(TraverserRequirement.SIDE_EFFECTS);
+            if (this.getSideEffects().getSackInitialValue().isPresent())
+                requirements.add(TraverserRequirement.SACK);
+            if (this.getEngine().isPresent() && this.getEngine().get().equals(TraversalEngine.COMPUTER))
+                requirements.add(TraverserRequirement.BULK);
+            return requirements;
+        }
+
+        /**
+         * Call the {@link Step#reset} method on every step in the traversal.
+         */
+        public default void reset() {
+            this.getSteps().forEach(Step::reset);
+        }
+
+        /**
+         * Assume the every {@link Step} implements {@link Reversible} and call {@link Reversible#reverse()} for each.
+         *
+         * @return the traversal with its steps reversed
+         */
+        public default Traversal.Admin<S, E> reverse() throws IllegalStateException {
+            if (!TraversalHelper.isReversible(this)) throw Exceptions.traversalIsNotReversible();
+            this.getSteps().stream().forEach(step -> ((Reversible) step).reverse());
+            return this;
+        }
+
+        /**
+         * Set the {@link TraversalSideEffects} of this traversal.
+         *
+         * @param sideEffects the sideEffects to set for this traversal.
+         */
+        public void setSideEffects(final TraversalSideEffects sideEffects);
+
+        /**
+         * Get the {@link TraversalSideEffects} associated with the traversal.
+         *
+         * @return The traversal sideEffects
+         */
+        public TraversalSideEffects getSideEffects();
+
+        /**
+         * Set the {@link TraversalStrategies} to be used by this traversal at evaluation time.
+         *
+         * @param strategies the strategies to use on this traversal
+         */
+        public void setStrategies(final TraversalStrategies strategies);
+
+        /**
+         * Get the {@link TraversalStrategies} associated with this traversal.
+         *
+         * @return the strategies associated with this traversal
+         */
+        public TraversalStrategies getStrategies();
+
+        /**
+         * Set the {@link com.tinkerpop.gremlin.process.traversal.step.TraversalParent} {@link Step} that is the parent of this traversal.
+         * Traversals can be nested and this is the means by which the traversal tree is connected.
+         *
+         * @param step the traversal holder parent step
+         */
+        public void setParent(final TraversalParent step);
+
+        /**
+         * Get the {@link com.tinkerpop.gremlin.process.traversal.step.TraversalParent} {@link Step} that is the parent of this traversal.
+         * Traversals can be nested and this is the means by which the traversal tree is walked.
+         *
+         * @return the traversal holder parent step
+         */
+        public TraversalParent getParent();
+
+        /**
+         * Cloning is used to duplicate the traversal typically in OLAP environments.
+         *
+         * @return The cloned traversal
+         */
+        public Traversal.Admin<S, E> clone() throws CloneNotSupportedException;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalEngine.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalEngine.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalEngine.java
new file mode 100644
index 0000000..ad14db6
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalEngine.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public enum TraversalEngine {
+
+    STANDARD, COMPUTER
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalSideEffects.java
new file mode 100644
index 0000000..f44e287
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalSideEffects.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import com.tinkerpop.gremlin.structure.Vertex;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface TraversalSideEffects extends Cloneable, Serializable {
+
+    public static final String SIDE_EFFECTS = "gremlin.traversal.sideEffects";
+
+    /**
+     * Determines if the {@link TraversalSideEffects} contains the respective key.
+     * If the key references a stored {@link java.util.function.Supplier}, then it should return true as it will be dynamically created on get().
+     *
+     * @param key the key to check for
+     * @return whether the key exists in the sideEffects
+     */
+    public default boolean exists(final String key) {
+        return this.keys().contains(key);
+    }
+
+    /**
+     * Set the specified key to the specified value.
+     * If a {@link java.util.function.Supplier} is provided, it is NOT assumed to be a supplier as set by registerSupplier().
+     *
+     * @param key   the key
+     * @param value the value
+     */
+    public void set(final String key, final Object value);
+
+    /**
+     * Get the sideEffect associated with the provided key.
+     * If the sideEffect contains an object for the key, return it.
+     * Else if the sideEffect has a registered {@link java.util.function.Supplier} for that key, generate the object, store the object in the sideEffects, and return it.
+     *
+     * @param key the key to get the value for
+     * @param <V> the type of the value to retrieve
+     * @return the value associated with key
+     * @throws IllegalArgumentException if the key does not reference an object or a registered supplier.
+     */
+    public <V> V get(final String key) throws IllegalArgumentException;
+
+    /**
+     * Return the value associated with the key or return the provided otherValue.
+     * The otherValue will not be stored in the sideEffect.
+     *
+     * @param key        the key to get the value for
+     * @param otherValue if not value is associated with key, return the other value.
+     * @param <V>        the type of the value to get
+     * @return the value associated with the key or the otherValue
+     */
+    public default <V> V orElse(final String key, final V otherValue) {
+        return this.exists(key) ? this.get(key) : otherValue;
+    }
+
+    /**
+     * If a value or registered {@link java.util.function.Supplier} exists for the provided key, consume it with the provided consumer.
+     *
+     * @param key      the key to the value
+     * @param consumer the consumer to process the value
+     * @param <V>      the type of the value to consume
+     */
+    public default <V> void ifPresent(final String key, final Consumer<V> consumer) {
+        if (this.exists(key)) consumer.accept(this.get(key));
+    }
+
+    /**
+     * Remove both the value and registered {@link java.util.function.Supplier} associated with provided key.
+     *
+     * @param key the key of the value and registered supplier to remove
+     */
+    public void remove(final String key);
+
+    /**
+     * The keys of the sideEffect which includes registered {@link java.util.function.Supplier} keys.
+     * In essence, that which is possible to get().
+     *
+     * @return the keys of the sideEffect
+     */
+    public Set<String> keys();
+
+    ////////////
+
+    /**
+     * Register a {@link java.util.function.Supplier} with the provided key.
+     * When sideEffects get() are called, if no object exists and there exists a registered supplier for the key, the object is generated.
+     * Registered suppliers are used for the lazy generation of sideEffect data.
+     *
+     * @param key      the key to register the supplier with
+     * @param supplier the supplier that will generate an object when get() is called if it hasn't already been created
+     */
+    public void registerSupplier(final String key, final Supplier supplier);
+
+    /**
+     * Get the registered {@link java.util.function.Supplier} associated with the specified key.
+     *
+     * @param key the key associated with the supplier
+     * @param <V> The object type of the supplier
+     * @return A non-empty optional if the supplier exists
+     */
+    public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key);
+
+    /**
+     * A helper method to register a {@link java.util.function.Supplier} if it has not already been registered.
+     *
+     * @param key      the key of the supplier to register
+     * @param supplier the supplier to register if the key has not already been registered
+     */
+    public default void registerSupplierIfAbsent(final String key, final Supplier supplier) {
+        if (!this.getRegisteredSupplier(key).isPresent())
+            this.registerSupplier(key, supplier);
+    }
+
+    public <S> void setSack(final Supplier<S> initialValue, final Optional<UnaryOperator<S>> splitOperator);
+
+    public <S> Optional<Supplier<S>> getSackInitialValue();
+
+    public <S> Optional<UnaryOperator<S>> getSackSplitOperator();
+
+    /**
+     * If the sideEffect contains an object associated with the key, return it.
+     * Else if a "with" supplier exists for the key, generate the object, store it in the sideEffects and return the object.
+     * Else use the provided supplier to generate the object, store it in the sideEffects and return the object.
+     * Note that if the orCreate supplier is used, it is NOT registered as a {@link java.util.function.Supplier}.
+     *
+     * @param key      the key of the object to get
+     * @param orCreate if the object doesn't exist as an object or suppliable object, then generate it with the specified supplier
+     * @param <V>      the return type of the object
+     * @return the object that is either retrieved, generated, or supplier via orCreate
+     */
+    public default <V> V getOrCreate(final String key, final Supplier<V> orCreate) {
+        if (this.exists(key))
+            return this.<V>get(key);
+        final Optional<Supplier<V>> with = this.getRegisteredSupplier(key);
+        if (with.isPresent()) {
+            final V v = with.get().get();
+            this.set(key, v);
+            return v;
+        } else {
+            final V v = orCreate.get();
+            this.set(key, v);
+            return v;
+        }
+    }
+
+    ////////////
+
+    public default <V> void forEach(final BiConsumer<String, V> biConsumer) {
+        this.keys().forEach(key -> biConsumer.accept(key, this.get(key)));
+    }
+
+    /**
+     * In a distributed {@link com.tinkerpop.gremlin.process.computer.GraphComputer} traversal, the sideEffects of the traversal are not a single object within a single JVM.
+     * Instead, the sideEffects are distributed across the graph and the pieces are stored on the computing vertices.
+     * This method is necessary to call when the {@link com.tinkerpop.gremlin.process.Traversal} is processing the {@link com.tinkerpop.gremlin.process.Traverser}s at a particular {@link com.tinkerpop.gremlin.structure.Vertex}.
+     *
+     * @param vertex the vertex where the traversal is currently executing.
+     */
+    public void setLocalVertex(final Vertex vertex);
+
+    /**
+     * Cloning is used to duplicate the sideEffects typically in OLAP environments.
+     *
+     * @return The cloned sideEffects
+     */
+    public TraversalSideEffects clone() throws CloneNotSupportedException;
+
+    /**
+     * Add the current {@link TraversalSideEffects} data and suppliers to the provided {@link TraversalSideEffects}.
+     *
+     * @param sideEffects the sideEffects to add this traversal's sideEffect data to.
+     */
+    public void mergeInto(final TraversalSideEffects sideEffects);
+
+    public static class Exceptions {
+
+        public static IllegalArgumentException sideEffectKeyCanNotBeEmpty() {
+            return new IllegalArgumentException("Side effect key can not be the empty string");
+        }
+
+        public static IllegalArgumentException sideEffectKeyCanNotBeNull() {
+            return new IllegalArgumentException("Side effect key can not be null");
+        }
+
+        public static IllegalArgumentException sideEffectValueCanNotBeNull() {
+            return new IllegalArgumentException("Side effect value can not be null");
+        }
+
+        public static IllegalArgumentException sideEffectDoesNotExist(final String key) {
+            return new IllegalArgumentException("Side effects do not have a value for provided key: " + key);
+        }
+
+        public static UnsupportedOperationException dataTypeOfSideEffectValueNotSupported(final Object val) {
+            return new UnsupportedOperationException(String.format("Side effect value [%s] is of type %s is not supported", val, val.getClass()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategies.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategies.java
new file mode 100644
index 0000000..c5cf221
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategies.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import com.tinkerpop.gremlin.process.graph.traversal.__;
+import com.tinkerpop.gremlin.process.graph.traversal.strategy.*;
+import com.tinkerpop.gremlin.process.traversal.DefaultTraversalStrategies;
+import com.tinkerpop.gremlin.process.traverser.TraverserGeneratorFactory;
+import com.tinkerpop.gremlin.structure.Edge;
+import com.tinkerpop.gremlin.structure.Graph;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.VertexProperty;
+import com.tinkerpop.gremlin.util.tools.MultiMap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public interface TraversalStrategies extends Serializable, Cloneable {
+
+    /**
+     * Return all the {@link TraversalStrategy} singleton instances associated with this {@link TraversalStrategies}.
+     */
+    public List<TraversalStrategy> toList();
+
+    /**
+     * Apply all the {@link TraversalStrategy} optimizers to the {@link Traversal} for the stated {@link TraversalEngine}.
+     * This method must ensure that the strategies are sorted prior to application.
+     *
+     * @param traversal the traversal to apply the strategies to
+     * @param engine    the engine that the traversal is going to be executed on
+     */
+    public void applyStrategies(final Traversal.Admin<?, ?> traversal, final TraversalEngine engine);
+
+    /**
+     * Add all the provided {@link TraversalStrategy} instances to the current collection.
+     * When all the provided strategies have been added, the collection is resorted.
+     *
+     * @param strategies the traversal strategies to add
+     * @return the newly updated/sorted traversal strategies collection
+     */
+    public TraversalStrategies addStrategies(final TraversalStrategy... strategies);
+
+    /**
+     * Remove all the provided {@link TraversalStrategy} classes from the current collection.
+     * When all the provided strategies have been removed, the collection is resorted.
+     *
+     * @param strategyClasses the traversal strategies to remove by their class
+     * @return the newly updated/sorted traversal strategies collection
+     */
+    @SuppressWarnings("unchecked")
+    public TraversalStrategies removeStrategies(final Class<? extends TraversalStrategy>... strategyClasses);
+
+    /**
+     * {@inheritDoc}
+     */
+    public TraversalStrategies clone() throws CloneNotSupportedException;
+
+    /**
+     * Get the {@link TraverserGeneratorFactory} to use to generate traversers.
+     */
+    public TraverserGeneratorFactory getTraverserGeneratorFactory();
+
+    /**
+     * Set the {@link TraverserGeneratorFactory} to use for determining which {@link Traverser} type to generate for the {@link Traversal}.
+     *
+     * @param traverserGeneratorFactory the factory to use
+     */
+    public void setTraverserGeneratorFactory(final TraverserGeneratorFactory traverserGeneratorFactory);
+
+    /**
+     * Sorts the list of provided strategies such that the {@link com.tinkerpop.gremlin.process.TraversalStrategy#applyPost()}
+     * and {@link TraversalStrategy#applyPrior()} dependencies are respected.
+     * <p/>
+     * Note, that the order may not be unique.
+     *
+     * @param strategies the traversal strategies to sort
+     */
+    public static void sortStrategies(final List<? extends TraversalStrategy> strategies) {
+        final Map<Class<? extends TraversalStrategy>, Set<Class<? extends TraversalStrategy>>> dependencyMap = new HashMap<>();
+        final Set<Class<? extends TraversalStrategy>> strategyClass = new HashSet<>(strategies.size());
+        //Initialize data structure
+        strategies.forEach(s -> strategyClass.add(s.getClass()));
+
+        //Initialize all the dependencies
+        strategies.forEach(strategy -> {
+            strategy.applyPrior().forEach(s -> {
+                if (strategyClass.contains(s)) MultiMap.put(dependencyMap, s, strategy.getClass());
+            });
+            strategy.applyPost().forEach(s -> {
+                if (strategyClass.contains(s)) MultiMap.put(dependencyMap, strategy.getClass(), s);
+            });
+        });
+        //Now, compute transitive closure until convergence
+        boolean updated;
+        do {
+            updated = false;
+            for (final Class<? extends TraversalStrategy> sc : strategyClass) {
+                List<Class<? extends TraversalStrategy>> toAdd = null;
+                for (Class<? extends TraversalStrategy> before : MultiMap.get(dependencyMap, sc)) {
+                    final Set<Class<? extends TraversalStrategy>> beforeDep = MultiMap.get(dependencyMap, before);
+                    if (!beforeDep.isEmpty()) {
+                        if (toAdd == null) toAdd = new ArrayList<>(beforeDep.size());
+                        toAdd.addAll(beforeDep);
+                    }
+                }
+                if (toAdd != null && MultiMap.putAll(dependencyMap, sc, toAdd)) updated = true;
+            }
+        } while (updated);
+        Collections.sort(strategies, new Comparator<TraversalStrategy>() {
+            @Override
+            public int compare(final TraversalStrategy s1, final TraversalStrategy s2) {
+                boolean s1Before = MultiMap.containsEntry(dependencyMap, s1.getClass(), s2.getClass());
+                boolean s2Before = MultiMap.containsEntry(dependencyMap, s2.getClass(), s1.getClass());
+                if (s1Before && s2Before)
+                    throw new IllegalStateException("Cyclic dependency between traversal strategies: ["
+                            + s1.getClass().getName() + ", " + s2.getClass().getName() + ']');
+                if (s1Before) return -1;
+                else if (s2Before) return 1;
+                else return 0;
+            }
+        });
+    }
+
+    public static final class GlobalCache {
+
+        private static final Map<Class, TraversalStrategies> CACHE = new HashMap<>();
+
+        static {
+            final TraversalStrategies coreStrategies = new DefaultTraversalStrategies();
+            coreStrategies.addStrategies(
+                    DedupOptimizerStrategy.instance(),
+                    RangeByIsCountStrategy.instance(),
+                    IdentityRemovalStrategy.instance(),
+                    SideEffectCapStrategy.instance(),
+                    MatchWhereStrategy.instance(),
+                    ComparatorHolderRemovalStrategy.instance(),
+                    ReducingStrategy.instance(),
+                    LabeledEndStepStrategy.instance(),
+                    EngineDependentStrategy.instance(),
+                    ProfileStrategy.instance(),
+                    SideEffectRegistrationStrategy.instance(),
+                    TraversalVerificationStrategy.instance(),
+                    ConjunctionStrategy.instance());
+            try {
+                CACHE.put(Graph.class, coreStrategies.clone());
+                CACHE.put(Vertex.class, coreStrategies.clone());
+                CACHE.put(Edge.class, coreStrategies.clone());
+                CACHE.put(VertexProperty.class, coreStrategies.clone());
+                CACHE.put(__.class, new DefaultTraversalStrategies());
+            } catch (final CloneNotSupportedException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        public static void registerStrategies(final Class emanatingClass, final TraversalStrategies traversalStrategies) {
+            CACHE.put(emanatingClass, traversalStrategies);
+        }
+
+        public static TraversalStrategies getStrategies(final Class emanatingClass) {
+            final TraversalStrategies traversalStrategies = CACHE.get(emanatingClass);
+            if (null == traversalStrategies) {
+                if (__.class.isAssignableFrom(emanatingClass))
+                    return CACHE.get(__.class);
+                else if (Graph.class.isAssignableFrom(emanatingClass))
+                    return CACHE.get(Graph.class);
+                else if (Vertex.class.isAssignableFrom(emanatingClass))
+                    return CACHE.get(Vertex.class);
+                else if (Edge.class.isAssignableFrom(emanatingClass))
+                    return CACHE.get(Edge.class);
+                else if (VertexProperty.class.isAssignableFrom(emanatingClass))
+                    return CACHE.get(VertexProperty.class);
+                else
+                    return new DefaultTraversalStrategies();
+                // throw new IllegalStateException("The provided class has no registered traversal strategies: " + emanatingClass);
+            }
+            return traversalStrategies;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategy.java
new file mode 100644
index 0000000..1ed0245
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraversalStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * A {@link TraversalStrategy} defines a particular atomic operation for mutating a {@link Traversal} prior to its evaluation.
+ * Traversal strategies are typically used for optimizing a traversal for the particular underlying graph engine.
+ * Traversal strategies implement {@link Comparable} and thus are sorted to determine their evaluation order.
+ * A TraversalStrategy should not have a public constructor as they should not maintain state between applications.
+ * Make use of a singleton instance() object to reduce object creation on the JVM.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public interface TraversalStrategy extends Serializable{
+
+    // A TraversalStrategy should not have a public constructor
+    // Make use of a singleton instance() object to reduce object creation on the JVM
+    // Moreover they are stateless objects.
+
+    public void apply(final Traversal.Admin<?, ?> traversal, final TraversalEngine traversalEngine);
+
+    public default Set<Class<? extends TraversalStrategy>> applyPrior() {
+        return Collections.emptySet();
+    }
+
+    public default Set<Class<? extends TraversalStrategy>> applyPost() {
+        return Collections.emptySet();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traverser.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traverser.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traverser.java
new file mode 100644
index 0000000..dfa2ae9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Traverser.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import com.tinkerpop.gremlin.structure.Graph;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.detached.Attachable;
+
+import java.io.Serializable;
+
+/**
+ * A {@link Traverser} represents the current state of an object flowing through a {@link Traversal}.
+ * A traverser maintains a reference to the current object, a traverser-local "sack", a traversal-global sideEffect, a bulk count, and a path history.
+ * <p/>
+ * Different types of traverser can exist depending on the semantics of the traversal and the desire for
+ * space/time optimizations of the developer.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Traverser<T> extends Serializable, Comparable<Traverser<T>>, Cloneable {
+
+    /**
+     * Get the object that the traverser is current at.
+     *
+     * @return The current object of the traverser
+     */
+    public T get();
+
+    /**
+     * Get the sack local sack object of this traverser.
+     *
+     * @param <S> the type of the sack object
+     * @return the sack object
+     */
+    public <S> S sack();
+
+    /**
+     * Set the traversers sack object to the provided value ("sack the value").
+     *
+     * @param object the new value of the traverser's sack
+     * @param <S>    the type of the object
+     */
+    public <S> void sack(final S object);
+
+    /**
+     * Get the current path of the traverser.
+     *
+     * @return The path of the traverser
+     */
+    public Path path();
+
+    /**
+     * Get the object associated with the specified step-label in the traverser's path history.
+     *
+     * @param stepLabel the step-label in the path to access
+     * @param <A>       the type of the object
+     * @return the object associated with that path label (if more than one object occurs at that step, a list is returned)
+     */
+    public default <A> A path(final String stepLabel) {
+        return this.path().get(stepLabel);
+    }
+
+    /**
+     * Return the number of times the traverser has gone through a looping section of a traversal.
+     *
+     * @return The number of times the traverser has gone through a loop
+     */
+    public int loops();
+
+    /**
+     * A traverser may represent a grouping of traversers to allow for more efficient data propagation.
+     *
+     * @return the number of traversers represented in this traverser.
+     */
+    public long bulk();
+
+    /**
+     * Get a particular value from the sideEffects of the traverser.
+     *
+     * @param sideEffectKey the key of the value to get from the sideEffects
+     * @param <A>           the type of the returned object
+     * @return the object in the sideEffects of the respective key
+     */
+    public default <A> A sideEffects(final String sideEffectKey) {
+        return this.asAdmin().getSideEffects().get(sideEffectKey);
+    }
+
+    /**
+     * Set a particular value in the sideEffects of the traverser.
+     *
+     * @param sideEffectKey   the key of the value to set int the sideEffects
+     * @param sideEffectValue the value to set for the sideEffect key
+     */
+    public default void sideEffects(final String sideEffectKey, final Object sideEffectValue) {
+        this.asAdmin().getSideEffects().set(sideEffectKey, sideEffectValue);
+    }
+
+    /**
+     * If the underlying object of the traverser is comparable, compare it with the other traverser.
+     *
+     * @param other the other traverser that presumably has a comparable internal object
+     * @return the comparison of the two objects of the traversers
+     * @throws ClassCastException if the object of the traverser is not comparable
+     */
+    @Override
+    public default int compareTo(final Traverser<T> other) throws ClassCastException {
+        return ((Comparable) this.get()).compareTo(other.get());
+    }
+
+    /**
+     * Typecast the traverser to a "system traverser" so {@link Traverser.Admin} methods can be accessed.
+     * This is used as a helper method to avoid the awkwardness of <code>((Traverser.Administrative)traverser)</code>.
+     * The default implementation simply returns "this" type casted to {@link Traverser.Admin}.
+     *
+     * @return The type-casted traverser
+     */
+    public default Admin<T> asAdmin() {
+        return (Admin<T>) this;
+    }
+
+    /**
+     * Traverser cloning is important when splitting a traverser at a bifurcation point in a traversal.
+     */
+    public Traverser<T> clone() throws CloneNotSupportedException;
+
+    /**
+     * The methods in System.Traverser are useful to underlying Step and Traversal implementations.
+     * They should not be accessed by the user during lambda-based manipulations.
+     */
+    public interface Admin<T> extends Traverser<T>, Attachable<Admin<T>> {
+
+        public static final String HALT = "halt";
+
+        /**
+         * When two traversers are {@link Traverser#equals} to each other, then they can be merged.
+         * This method is used to merge the traversers into a single traverser.
+         * This is used for optimization where instead of enumerating all traversers, they can be counted.
+         *
+         * @param other the other traverser to merge into this traverser. Once merged, the other can be garbage collected.
+         */
+        public void merge(final Admin<?> other);
+
+        /**
+         * Generate a child traverser of the current traverser for current as step and new object location.
+         * The child has the path history, future, and loop information of the parent.
+         * The child extends that path history with the current as and provided R-object.
+         *
+         * @param r    The current object of the child
+         * @param step The step yielding the split
+         * @param <R>  The current object type of the child
+         * @return The split traverser
+         */
+        public <R> Admin<R> split(final R r, final Step<T, R> step);
+
+        /**
+         * Generate a sibling traverser of the current traverser with a full copy of all state within the sibling.
+         *
+         * @return The split traverser
+         */
+        public Admin<T> split();
+
+        /**
+         * Set the current object location of the traverser.
+         *
+         * @param t The current object of the traverser
+         */
+        public void set(final T t);
+
+        /**
+         * Increment the number of times the traverser has gone through a looping section of traversal.
+         * The step label is important to create a stack of loop counters when within a nested context.
+         * If the provided label is not the same as the current label on the stack, add a new loop counter.
+         * If the provided label is the same as the current label on the stack, increment the loop counter.
+         *
+         * @param stepLabel the label of the step that is doing the incrementing
+         */
+        public void incrLoops(final String stepLabel);
+
+        /**
+         * Set the number of times the traverser has gone through a loop back to 0.
+         * When a traverser exits a looping construct, this method should be called.
+         * In a nested loop context, the highest stack loop counter should be removed.
+         */
+        public void resetLoops();
+
+        /**
+         * Get the step id of where the traverser is located.
+         * This is typically used in multi-machine systems that require the movement of
+         * traversers between different traversal instances.
+         *
+         * @return The future step for the traverser
+         */
+        public String getStepId();
+
+        /**
+         * Set the step id of where the traverser is located.
+         * If the future is {@link Traverser.Admin#HALT}, then {@link Traverser.Admin#isHalted()} is true.
+         *
+         * @param stepId The future step of the traverser
+         */
+        public void setStepId(final String stepId);
+
+        /**
+         * If the traverser has "no future" then it is done with its lifecycle.
+         * This does not mean that the traverser is "dead," only that it has successfully passed through a {@link Traversal}.
+         *
+         * @return Whether the traverser is done executing or not
+         */
+        public default boolean isHalted() {
+            return getStepId().equals(HALT);
+        }
+
+        /**
+         * Set the number of traversers represented by this traverser.
+         *
+         * @param count the number of traversers
+         */
+        public void setBulk(final long count);
+
+        /**
+         * Prepare the traverser for migration across a JVM boundary.
+         *
+         * @return The deflated traverser
+         */
+        public Admin<T> detach();
+
+        /**
+         * Regenerate the detached traverser given its location at a particular vertex.
+         *
+         * @param hostVertex The vertex that is hosting the traverser
+         * @return The inflated traverser
+         */
+        @Override
+        public Admin<T> attach(final Vertex hostVertex);
+
+        /**
+         * Traversers can not attach to graphs and thus, an {@link UnsupportedOperationException} is thrown.
+         *
+         * @param graph the graph to attach the traverser to, which it can't.
+         * @return nothing as an exception is thrown
+         * @throws UnsupportedOperationException is always thrown as it makes no sense to attach a traverser to a graph
+         */
+        @Override
+        public default Admin<T> attach(final Graph graph) throws UnsupportedOperationException {
+            throw new UnsupportedOperationException("A traverser can only exist at the vertices of the graph, not the graph itself");
+        }
+
+        /**
+         * Set the sideEffects of the {@link Traversal}. Given that traversers can move between machines,
+         * it may be important to re-set this when the traverser crosses machine boundaries.
+         *
+         * @param sideEffects the sideEffects of the traversal.
+         */
+        public void setSideEffects(final TraversalSideEffects sideEffects);
+
+        /**
+         * Get the sideEffects associated with the traversal of the traverser.
+         *
+         * @return the traversal sideEffects of the traverser
+         */
+        public TraversalSideEffects getSideEffects();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraverserGenerator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraverserGenerator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraverserGenerator.java
new file mode 100644
index 0000000..e0ca217
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/TraverserGenerator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process;
+
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface TraverserGenerator {
+
+    public Set<TraverserRequirement> getProvidedRequirements();
+
+    public <S> Traverser.Admin<S> generate(final S start, final Step<S, ?> startStep, final long initialBulk);
+
+    public default <S> Iterator<Traverser.Admin<S>> generateIterator(final Iterator<S> starts, final Step<S, ?> startStep, final long initialBulk) {
+        return new Iterator<Traverser.Admin<S>>() {
+            @Override
+            public boolean hasNext() {
+                return starts.hasNext();
+            }
+
+            @Override
+            public Traverser.Admin<S> next() {
+                return generate(starts.next(), startStep, initialBulk);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ComputerResult.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ComputerResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ComputerResult.java
new file mode 100644
index 0000000..d7bfb90
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ComputerResult.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import com.tinkerpop.gremlin.structure.Graph;
+
+/**
+ * The result of the {@link GraphComputer}'s computation. This is returned in a {@link java.util.concurrent.Future} by {@link GraphComputer#submit}.
+ * A GraphComputer computation yields two things: an updated view of the computed on {@link Graph} and any computational sideEffects called {@link Memory}.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface ComputerResult extends AutoCloseable {
+
+    /**
+     * Get the view of the original {@link Graph} computed on by the GraphComputer.
+     *
+     * @return The computed graph
+     */
+    public Graph graph();
+
+    /**
+     * Get the computational sideEffects called {@link Memory} of the GraphComputer.
+     *
+     * @return the computed memory
+     */
+    public Memory memory();
+
+    /**
+     * Close the computed {@link GraphComputer} result. The semantics of "close" differ depending on the underlying implementation.
+     * In general, when a {@link ComputerResult} is closed, the computed values are no longer available to the user.
+     *
+     * @throws Exception
+     */
+    @Override
+    public void close() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
new file mode 100644
index 0000000..2ad3e6d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import java.util.concurrent.Future;
+
+/**
+ * The {@link GraphComputer} is responsible for the execution of a {@link VertexProgram} and then a set of {@link MapReduce} jobs
+ * over the vertices in the {@link com.tinkerpop.gremlin.structure.Graph}. It is up to the {@link GraphComputer} implementation to determine the
+ * appropriate memory structures given the computing substrate. {@link GraphComputer} implementations also
+ * maintains levels of memory {@link Isolation}: Bulk Synchronous and Dirty Bulk Synchronous.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public interface GraphComputer {
+
+    public enum Isolation {
+        /**
+         * Computations are carried out in a bulk synchronous manner.
+         * The results of a vertex property update are only visible after the round is complete.
+         */
+        BSP,
+        /**
+         * Computations are carried out in an bulk asynchronous manner.
+         * The results of a vertex property update are visible before the end of the round.
+         */
+        DIRTY_BSP
+    }
+
+    /**
+     * Set the {@link Isolation} of the computation.
+     *
+     * @param isolation the isolation of the computation
+     * @return the updated GraphComputer with newly set isolation
+     */
+    public GraphComputer isolation(final Isolation isolation);
+
+    /**
+     * Set the {@link VertexProgram} to be executed by the {@link GraphComputer}.
+     * There can only be one VertexProgram for the GraphComputer.
+     *
+     * @param vertexProgram the VertexProgram to be executed
+     * @return the updated GraphComputer with newly set VertexProgram
+     */
+    public GraphComputer program(final VertexProgram vertexProgram);
+
+    /**
+     * Add a {@link MapReduce} job to the set of MapReduce jobs to be executed by the {@link GraphComputer}.
+     * There can be any number of MapReduce jobs.
+     *
+     * @param mapReduce the MapReduce job to add to the computation
+     * @return the updated GraphComputer with newly added MapReduce job
+     */
+    public GraphComputer mapReduce(final MapReduce mapReduce);
+
+    /**
+     * Submit the {@link VertexProgram} and the set of {@link MapReduce} jobs for execution by the {@link GraphComputer}.
+     *
+     * @return a {@link Future} denoting a reference to the asynchronous computation and where to get the {@link com.tinkerpop.gremlin.process.computer.util.DefaultComputerResult} when its is complete.
+     */
+    public Future<ComputerResult> submit();
+
+    public default Features features() {
+        return new Features() {
+        };
+    }
+
+    public interface Features {
+        public default boolean supportsWorkerPersistenceBetweenIterations() {
+            return true;
+        }
+
+        public default boolean supportsGlobalMessageScopes() {
+            return true;
+        }
+
+        public default boolean supportsLocalMessageScopes() {
+            return true;
+        }
+
+        public default boolean supportsVertexAddition() {
+            return true;
+        }
+
+        public default boolean supportsVertexRemoval() {
+            return true;
+        }
+
+        public default boolean supportsVertexPropertyAddition() {
+            return true;
+        }
+
+        public default boolean supportsVertexPropertyRemoval() {
+            return true;
+        }
+
+        public default boolean supportsEdgeAddition() {
+            return true;
+        }
+
+        public default boolean supportsEdgeRemoval() {
+            return true;
+        }
+
+        public default boolean supportsEdgePropertyAddition() {
+            return true;
+        }
+
+        public default boolean supportsEdgePropertyRemoval() {
+            return true;
+        }
+
+        public default boolean supportsIsolation(final Isolation isolation) {
+            return true;
+        }
+    }
+
+    public static class Exceptions {
+        public static IllegalStateException adjacentElementPropertiesCanNotBeRead() {
+            return new IllegalStateException("The properties of an adjacent element can not be read, only its id");
+        }
+
+        public static IllegalStateException adjacentElementPropertiesCanNotBeWritten() {
+            return new IllegalStateException("The properties of an adjacent element can not be written");
+        }
+
+        public static IllegalArgumentException providedKeyIsNotAnElementComputeKey(final String key) {
+            return new IllegalArgumentException("The provided key is not an element compute key: " + key);
+        }
+
+        public static IllegalArgumentException providedKeyIsNotAMemoryComputeKey(final String key) {
+            return new IllegalArgumentException("The provided key is not a memory compute key: " + key);
+        }
+
+        public static IllegalStateException adjacentVerticesCanNotBeQueried() {
+            return new IllegalStateException("It is not possible to query an adjacent vertex in a vertex program");
+        }
+
+        public static IllegalArgumentException isolationNotSupported(final Isolation isolation) {
+            return new IllegalArgumentException("The provided isolation is not supported by this graph computer: " + isolation);
+        }
+
+        public static IllegalStateException computerHasAlreadyBeenSubmittedAVertexProgram() {
+            return new IllegalStateException("This computer has already had a vertex program submitted to it");
+        }
+
+        public static IllegalStateException computerHasNoVertexProgramNorMapReducers() {
+            return new IllegalStateException("The computer has no vertex program or map reducers to execute");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/KeyValue.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/KeyValue.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/KeyValue.java
new file mode 100644
index 0000000..ca2af61
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/KeyValue.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class KeyValue<K, V> implements Serializable {
+
+    private final K key;
+    private final V value;
+    private static final String EMPTY_STRING = "";
+    private static final String TAB = "\t";
+    private static final String NULL = "null";
+
+    public KeyValue(final K key, final V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public K getKey() {
+        return this.key;
+    }
+
+    public V getValue() {
+        return this.value;
+    }
+
+    public String toString() {
+        if (this.key instanceof MapReduce.NullObject && this.value instanceof MapReduce.NullObject) {
+            return EMPTY_STRING;
+        } else if (this.key instanceof MapReduce.NullObject) {
+            return makeString(this.value);
+        } else if (this.value instanceof MapReduce.NullObject) {
+            return makeString(this.key);
+        } else {
+            return makeString(this.key) + TAB + makeString(this.value);
+        }
+    }
+
+    private static final String makeString(final Object object) {
+        return null == object ? NULL : object.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
new file mode 100644
index 0000000..b669aa9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer;
+
+import com.tinkerpop.gremlin.structure.Vertex;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Optional;
+
+/**
+ * A MapReduce is composed of map(), combine(), and reduce() stages.
+ * The map() stage processes the vertices of the {@link com.tinkerpop.gremlin.structure.Graph} in a logically parallel manner.
+ * The combine() stage aggregates the values of a particular map emitted key prior to sending across the cluster.
+ * The reduce() stage aggregates the values of the combine/map emitted keys for the keys that hash to the current machine in the cluster.
+ * The interface presented here is nearly identical to the interface popularized by Hadoop save the the map() is over the vertices of the graph.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
+
+    public static final String MAP_REDUCE = "gremlin.mapReduce";
+
+    /**
+     * MapReduce is composed of three stages: map, combine, and reduce.
+     */
+    public static enum Stage {
+        MAP, COMBINE, REDUCE
+    }
+
+    /**
+     * When it is necessary to store the state of a MapReduce job, this method is called.
+     * This is typically required when the MapReduce job needs to be serialized to another machine.
+     * Note that what is stored is simply the instance state, not any processed data.
+     *
+     * @param configuration the configuration to store the state of the MapReduce job in.
+     */
+    public default void storeState(final Configuration configuration) {
+        configuration.setProperty(MAP_REDUCE, this.getClass().getName());
+    }
+
+    /**
+     * When it is necessary to load the state of a MapReduce job, this method is called.
+     * This is typically required when the MapReduce job needs to be serialized to another machine.
+     * Note that what is loaded is simply the instance state, not any processed data.
+     * <p/>
+     * It is important that the state loaded from loadState() is identical to any state created from a constructor.
+     * For those GraphComputers that do not need to use Configurations to migrate state between JVMs, the constructor will only be used.
+     *
+     * @param configuration the configuration to load the state of the MapReduce job from.
+     */
+    public default void loadState(final Configuration configuration) {
+
+    }
+
+    /**
+     * A MapReduce job can be map-only, map-reduce-only, or map-combine-reduce.
+     * Before executing the particular stage, this method is called to determine if the respective stage is defined.
+     * This method should return true if the respective stage as a non-default method implementation.
+     *
+     * @param stage the stage to check for definition.
+     * @return whether that stage should be executed.
+     */
+    public boolean doStage(final Stage stage);
+
+    /**
+     * The map() method is logically executed at all vertices in the graph in parallel.
+     * The map() method emits key/value pairs given some analysis of the data in the vertices (and/or its incident edges).
+     *
+     * @param vertex  the current vertex being map() processed.
+     * @param emitter the component that allows for key/value pairs to be emitted to the next stage.
+     */
+    public default void map(final Vertex vertex, final MapEmitter<MK, MV> emitter) {
+    }
+
+    /**
+     * The combine() method is logically executed at all "machines" in parallel.
+     * The combine() method pre-combines the values for a key prior to propagation over the wire.
+     * The combine() method must emit the same key/value pairs as the reduce() method.
+     * If there is a combine() implementation, there must be a reduce() implementation.
+     * If the MapReduce implementation is single machine, it can skip executing this method as reduce() is sufficient.
+     *
+     * @param key     the key that has aggregated values
+     * @param values  the aggregated values associated with the key
+     * @param emitter the component that allows for key/value pairs to be emitted to the reduce stage.
+     */
+    public default void combine(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
+    }
+
+    /**
+     * The reduce() method is logically on the "machine" the respective key hashes to.
+     * The reduce() method combines all the values associated with the key and emits key/value pairs.
+     *
+     * @param key     the key that has aggregated values
+     * @param values  the aggregated values associated with the key
+     * @param emitter the component that allows for key/value pairs to be emitted as the final result.
+     */
+    public default void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter) {
+    }
+
+    /**
+     * If a {@link Comparator} is provided, then all pairs leaving the {@link MapEmitter} are sorted.
+     * The sorted results are either fed sorted to the combine/reduce-stage or as the final output.
+     * If sorting is not required, then {@link Optional#empty} should be returned as sorting is computationally expensive.
+     * The default implementation returns {@link Optional#empty}.
+     *
+     * @return an {@link Optional} of a comparator for sorting the map output.
+     */
+    public default Optional<Comparator<MK>> getMapKeySort() {
+        return Optional.empty();
+    }
+
+    /**
+     * If a {@link Comparator} is provided, then all pairs leaving the {@link ReduceEmitter} are sorted.
+     * If sorting is not required, then {@link Optional#empty} should be returned as sorting is computationally expensive.
+     * The default implementation returns {@link Optional#empty}.
+     *
+     * @return an {@link Optional} of a comparator for sorting the reduce output.
+     */
+    public default Optional<Comparator<RK>> getReduceKeySort() {
+        return Optional.empty();
+    }
+
+    /**
+     * The key/value pairs emitted by reduce() (or map() in a map-only job) can be iterated to generate a local JVM Java object.
+     *
+     * @param keyValues the key/value pairs that were emitted from reduce() (or map() in a map-only job)
+     * @return the resultant object formed from the emitted key/values.
+     */
+    public R generateFinalResult(final Iterator<KeyValue<RK, RV>> keyValues);
+
+    /**
+     * The results of the MapReduce job are associated with a memory-key to ultimately be stored in {@link Memory}.
+     *
+     * @return the memory key of the generated result object.
+     */
+    public String getMemoryKey();
+
+    /**
+     * The final result can be generated and added to {@link Memory} and accessible via {@link com.tinkerpop.gremlin.process.computer.util.DefaultComputerResult}.
+     * The default simply takes the object from generateFinalResult() and adds it to the Memory given getMemoryKey().
+     *
+     * @param memory    the memory of the {@link GraphComputer}
+     * @param keyValues the key/value pairs emitted from reduce() (or map() in a map only job).
+     */
+    public default void addResultToMemory(final Memory.Admin memory, final Iterator<KeyValue<RK, RV>> keyValues) {
+        memory.set(this.getMemoryKey(), this.generateFinalResult(keyValues));
+    }
+
+    /**
+     * When multiple workers on a single machine need MapReduce instances, it is possible to use clone.
+     * This will provide a speedier way of generating instances, over the {@link MapReduce#storeState} and {@link MapReduce#loadState} model.
+     * The default implementation simply returns the object as it assumes that the MapReduce instance is a stateless singleton.
+     *
+     * @return A clone of the MapReduce object
+     * @throws CloneNotSupportedException
+     */
+    public MapReduce<MK, MV, RK, RV, R> clone() throws CloneNotSupportedException;
+
+    /**
+     * A helper method to construct a {@link MapReduce} given the content of the supplied configuration.
+     * The class of the MapReduce is read from the {@link MapReduce#MAP_REDUCE} static configuration key.
+     * Once the MapReduce is constructed, {@link MapReduce#loadState} method is called with the provided configuration.
+     *
+     * @param configuration A configuration with requisite information to build a MapReduce
+     * @return the newly constructed MapReduce
+     */
+    public static <M extends MapReduce<MK, MV, RK, RV, R>, MK, MV, RK, RV, R> M createMapReduce(final Configuration configuration) {
+        try {
+            final Class<M> mapReduceClass = (Class) Class.forName(configuration.getString(MAP_REDUCE));
+            final Constructor<M> constructor = mapReduceClass.getDeclaredConstructor();
+            constructor.setAccessible(true);
+            final M mapReduce = constructor.newInstance();
+            mapReduce.loadState(configuration);
+            return mapReduce;
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    //////////////////
+
+    /**
+     * The MapEmitter is used to emit key/value pairs from the map() stage of the MapReduce job.
+     * The implementation of MapEmitter is up to the vendor, not the developer.
+     *
+     * @param <K> the key type
+     * @param <V> the value type
+     */
+    public interface MapEmitter<K, V> {
+        public void emit(final K key, final V value);
+
+        /**
+         * A default method that assumes the key is {@link com.tinkerpop.gremlin.process.computer.MapReduce.NullObject}.
+         *
+         * @param value the value to emit.
+         */
+        public default void emit(final V value) {
+            this.emit((K) MapReduce.NullObject.instance(), value);
+        }
+    }
+
+    /**
+     * The ReduceEmitter is used to emit key/value pairs from the combine() and reduce() stages of the MapReduce job.
+     * The implementation of ReduceEmitter is up to the vendor, not the developer.
+     *
+     * @param <OK> the key type
+     * @param <OV> the value type
+     */
+    public interface ReduceEmitter<OK, OV> {
+        public void emit(final OK key, OV value);
+
+        /**
+         * A default method that assumes the key is {@link com.tinkerpop.gremlin.process.computer.MapReduce.NullObject}.
+         *
+         * @param value the value to emit.
+         */
+        public default void emit(final OV value) {
+            this.emit((OK) MapReduce.NullObject.instance(), value);
+        }
+    }
+
+    //////////////////
+
+    /**
+     * A convenience singleton when a single key is needed so that all emitted values converge to the same combiner/reducer.
+     */
+    public static class NullObject implements Comparable<NullObject>, Serializable {
+        private static final NullObject INSTANCE = new NullObject();
+        private static final String NULL_OBJECT = "";
+
+        public static NullObject instance() {
+            return INSTANCE;
+        }
+
+        @Override
+        public int hashCode() {
+            return 666666666;
+        }
+
+        @Override
+        public boolean equals(final Object object) {
+            return object instanceof NullObject;
+        }
+
+        @Override
+        public int compareTo(final NullObject nullObject) {
+            return 0;
+        }
+
+        @Override
+        public String toString() {
+            return NULL_OBJECT;
+        }
+
+        private void readObject(final ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
+
+        }
+
+        private void writeObject(final ObjectOutputStream outputStream) throws IOException {
+
+        }
+    }
+}