You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:01:43 UTC

[17/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop directories to org/apache/tinkerpop

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/MatchStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/MatchStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/MatchStep.java
new file mode 100644
index 0000000..7928345
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/MatchStep.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.map.match;
+
+import com.tinkerpop.gremlin.process.FastNoSuchElementException;
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.B_O_PA_S_SE_SL_Traverser;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Stack;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public final class MatchStep<S, E> extends AbstractStep<S, Map<String, E>> implements TraversalParent {
+
+    static final BiConsumer<String, Object> TRIVIAL_CONSUMER = (s, t) -> {
+    };
+
+    private static final String ANON_LABEL_PREFIX = "_";
+
+    // optimize before processing each start object, by default
+    private static final int DEFAULT_STARTS_PER_OPTIMIZE = 1;
+
+    private final String startLabel;
+    private final Map<String, List<TraversalWrapper<S, S>>> traversalsByStartAs;
+    private final List<Traversal> traversals = new ArrayList<>();
+
+    private int startsPerOptimize = DEFAULT_STARTS_PER_OPTIMIZE;
+    private int optimizeCounter = -1;
+    private int anonLabelCounter = 0;
+
+    private Enumerator<S> currentSolution;
+    private int currentIndex;
+
+    // initial value allows MatchStep to be used as a stand-alone query engine
+    private Traverser.Admin<S> currentStart;
+
+    public MatchStep(final Traversal.Admin traversal, final String startLabel, final Traversal... traversals) {
+        super(traversal);
+        this.startLabel = startLabel;
+        this.traversalsByStartAs = new HashMap<>();
+        this.currentStart = new B_O_PA_S_SE_SL_Traverser<>(null, this);
+        for (final Traversal tl : traversals) {
+            addTraversalPrivate(tl);
+            this.integrateChild(tl.asAdmin(), TYPICAL_LOCAL_OPERATIONS);
+            this.traversals.add(tl);
+        }
+        checkSolvability();
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements();
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.traversalsByStartAs);
+    }
+
+    /**
+     * Adds an individual traversal to an already-constructed MatchStep.
+     * The query must be solvable after addition (i.e. should not require the addition of
+     * further traversals in order to be solvable)
+     * This method should be called before the query is first executed.
+     *
+     * @param traversal the traversal to add
+     */
+    public void addTraversal(final Traversal<S, S> traversal) {
+        addTraversalPrivate(traversal);
+        this.traversals.add(traversal);
+        this.integrateChild(traversal.asAdmin(), TYPICAL_LOCAL_OPERATIONS);
+        checkSolvability();
+    }
+
+    public void setStartsPerOptimize(final int startsPerOptimize) {
+        if (startsPerOptimize < 1) {
+            throw new IllegalArgumentException();
+        }
+        this.startsPerOptimize = startsPerOptimize;
+    }
+
+    @Override
+    protected Traverser<Map<String, E>> processNextStart() throws NoSuchElementException {
+        final Map<String, E> map = new HashMap<>();
+        final Traverser<Map<String, E>> result = this.currentStart.split(map, this);
+        final BiConsumer<String, S> resultSetter = (name, value) -> map.put(name, (E) value);
+
+        while (true) { // break out when the current solution is exhausted and there are no more starts
+            if (null == this.currentSolution) {
+                if (this.starts.hasNext()) {
+                    this.optimizeCounter = (this.optimizeCounter + 1) % this.startsPerOptimize;
+                    if (0 == this.optimizeCounter) {
+                        optimize();
+                    }
+
+                    this.currentStart = this.starts.next();
+                    this.currentSolution = solveFor(IteratorUtils.of(this.currentStart.get()));
+                    this.currentIndex = 0;
+                } else {
+                    throw FastNoSuchElementException.instance();
+                }
+            }
+
+            map.clear();
+            if (this.currentSolution.visitSolution(this.currentIndex++, resultSetter)) {
+                return result;
+            } else {
+                this.currentSolution = null;
+            }
+        }
+    }
+
+    /**
+     * @return a description of the current state of this step, including the query plan and gathered statistics
+     */
+    public String summarize() {
+        final StringBuilder sb = new StringBuilder("match \"")
+                .append(this.startLabel)
+                .append("\":\t")
+                .append(findCost(this.startLabel))
+                .append('\n');
+        summarize(this.startLabel, sb, new HashSet<>(), 1);
+        return sb.toString();
+    }
+
+    private void summarize(final String outLabel,
+                           final StringBuilder sb,
+                           final Set<String> visited,
+                           final int indent) {
+        if (!visited.contains(outLabel)) {
+            visited.add(outLabel);
+            final List<TraversalWrapper<S, S>> outs = traversalsByStartAs.get(outLabel);
+            if (null != outs) {
+                for (final TraversalWrapper<S, S> w : outs) {
+                    for (int i = 0; i < indent; i++) sb.append('\t');
+                    sb.append(outLabel).append("->").append(w.endLabel).append(":\t");
+                    sb.append(findCost(w));
+                    sb.append('\t').append(w);
+                    sb.append('\n');
+                    summarize(w.endLabel, sb, visited, indent + 1);
+                }
+            }
+        }
+    }
+
+    private void addTraversalPrivate(final Traversal<S, S> traversal) {
+
+        String startAs = traversal.asAdmin().getStartStep().getLabel().orElseThrow(()
+                -> new IllegalArgumentException("All match traversals must have their start step labeled with as()"));
+        String endAs = traversal.asAdmin().getEndStep().getLabel().orElse(null);
+        checkAs(startAs);
+        if (null == endAs) {
+            endAs = createAnonymousAs();
+        } else {
+            checkAs(endAs);
+        }
+
+        final TraversalWrapper<S, S> wrapper = new TraversalWrapper<>(traversal, startAs, endAs);
+        // index all wrapped traversals by their startLabel
+        List<TraversalWrapper<S, S>> l2 = this.traversalsByStartAs.get(startAs);
+        if (null == l2) {
+            l2 = new LinkedList<>();
+            this.traversalsByStartAs.put(startAs, l2);
+        }
+        l2.add(wrapper);
+    }
+
+    // given all the wrapped traversals, determine bad patterns in the set and throw exceptions if not solvable
+    private void checkSolvability() {
+        final Set<String> pathSet = new HashSet<>();
+        final Stack<String> stack = new Stack<>();
+        stack.push(this.startLabel);
+        int countTraversals = 0;
+        while (!stack.isEmpty()) {
+            final String outAs = stack.peek();
+            if (pathSet.contains(outAs)) {
+                stack.pop();
+                pathSet.remove(outAs);
+            } else {
+                pathSet.add(outAs);
+                final List<TraversalWrapper<S, S>> l = traversalsByStartAs.get(outAs);
+                if (null != l) {
+                    for (final TraversalWrapper<S, S> tw : l) {
+                        countTraversals++;
+                        if (pathSet.contains(tw.endLabel)) {
+                            throw new IllegalArgumentException("The provided traversal set contains a cycle due to '"
+                                    + tw.endLabel + '\'');
+                        }
+                        stack.push(tw.endLabel);
+                    }
+                }
+            }
+        }
+
+        int totalTraversals = 0;
+        for (List<TraversalWrapper<S, S>> l : this.traversalsByStartAs.values()) {
+            totalTraversals += l.size();
+        }
+
+        if (countTraversals < totalTraversals) {
+            throw new IllegalArgumentException("The provided traversal set contains unreachable as-label(s)");
+        }
+    }
+
+    private static void checkAs(final String as) {
+        // note: this won't happen so long as the anon prefix is the same as Traversal.UNDERSCORE
+        if (isAnonymousAs(as)) {
+            throw new IllegalArgumentException("The step named '" + as + "' uses reserved prefix '"
+                    + ANON_LABEL_PREFIX + '\'');
+        }
+    }
+
+    private static boolean isAnonymousAs(final String as) {
+        return as.startsWith(ANON_LABEL_PREFIX);
+    }
+
+    private String createAnonymousAs() {
+        return ANON_LABEL_PREFIX + ++this.anonLabelCounter;
+    }
+
+    /**
+     * Directly applies this match query to a sequence of inputs
+     *
+     * @param inputs a sequence of inputs
+     * @return an enumerator over all solutions
+     */
+    public Enumerator<S> solveFor(final Iterator<S> inputs) {
+        return solveFor(startLabel, inputs);
+    }
+
+    private Enumerator<S> solveFor(final String localStartAs,
+                                   final Iterator<S> inputs) {
+        List<TraversalWrapper<S, S>> outs = traversalsByStartAs.get(localStartAs);
+        if (null == outs) {
+            // no out-traversals from here; just enumerate the values bound to localStartAs
+            return isAnonymousAs(localStartAs)
+                    ? new SimpleEnumerator<>(localStartAs, inputs)
+                    : new IteratorEnumerator<>(localStartAs, inputs);
+        } else {
+            // for each value bound to localStartAs, feed it into all out-traversals in parallel and join the results
+            return new SerialEnumerator<>(localStartAs, inputs, o -> {
+                Enumerator<S> result = null;
+                Set<String> leftLabels = new HashSet<>();
+
+                for (TraversalWrapper<S, S> w : outs) {
+                    TraversalUpdater<S, S> updater
+                            = new TraversalUpdater<>(w, IteratorUtils.of(o), currentStart, this.getId());
+
+                    Set<String> rightLabels = new HashSet<>();
+                    addVariables(w.endLabel, rightLabels);
+                    Enumerator<S> ie = solveFor(w.endLabel, updater);
+                    result = null == result ? ie : crossJoin(result, ie, leftLabels, rightLabels);
+                    leftLabels.addAll(rightLabels);
+                }
+
+                return result;
+            });
+        }
+    }
+
+    private static <T> Enumerator<T> crossJoin(final Enumerator<T> left,
+                                               final Enumerator<T> right,
+                                               final Set<String> leftLabels,
+                                               final Set<String> rightLabels) {
+        Set<String> shared = new HashSet<>();
+        for (String s : rightLabels) {
+            if (leftLabels.contains(s)) {
+                shared.add(s);
+            }
+        }
+
+        Enumerator<T> cj = new CrossJoinEnumerator<>(left, right);
+        return shared.size() > 0 ? new InnerJoinEnumerator<>(cj, shared) : cj;
+    }
+
+    // recursively add all non-anonymous variables from a starting point in the query
+    private void addVariables(final String localStartAs,
+                              final Set<String> variables) {
+        if (!isAnonymousAs(localStartAs)) {
+            variables.add(localStartAs);
+        }
+
+        List<TraversalWrapper<S, S>> outs = traversalsByStartAs.get(localStartAs);
+        if (null != outs) {
+            for (TraversalWrapper<S, S> w : outs) {
+                String endAs = w.endLabel;
+                if (!variables.contains(endAs)) {
+                    addVariables(endAs, variables);
+                }
+            }
+        }
+    }
+
+    // applies a visitor, skipping anonymous variables
+    static <T> void visit(final String name,
+                          final T value,
+                          final BiConsumer<String, T> visitor) {
+        if (!isAnonymousAs(name)) {
+            visitor.accept(name, value);
+        }
+    }
+
+    /**
+     * Computes and applies a new query plan based on gathered statistics about traversal inputs and outputs.
+     */
+    // note: optimize() is never called from within a solution iterator, as it changes the query plan
+    public void optimize() {
+        optimizeAt(startLabel);
+    }
+
+    private void optimizeAt(final String outAs) {
+        List<TraversalWrapper<S, S>> outs = traversalsByStartAs.get(outAs);
+        if (null != outs) {
+            for (TraversalWrapper<S, S> t : outs) {
+                optimizeAt(t.endLabel);
+                updateOrderingFactor(t);
+            }
+            Collections.sort(outs);
+        }
+    }
+
+    private double findCost(final TraversalWrapper<S, S> root) {
+        double bf = root.findBranchFactor();
+        return bf + findCost(root.endLabel, root.findBranchFactor());
+    }
+
+    private double findCost(final String outAs,
+                            final double branchFactor) {
+        double bf = branchFactor;
+
+        double cost = 0;
+
+        List<TraversalWrapper<S, S>> outs = traversalsByStartAs.get(outAs);
+        if (null != outs) {
+            for (TraversalWrapper<S, S> child : outs) {
+                cost += bf * findCost(child);
+                bf *= child.findBranchFactor();
+            }
+        }
+
+        return cost;
+    }
+
+    /**
+     * @param outLabel the out-label of one or more traversals in the query
+     * @return the expected cost, in the current query plan, of applying the branch of the query plan at
+     * the given out-label to one start value
+     */
+    public double findCost(final String outLabel) {
+        return findCost(outLabel, 1.0);
+    }
+
+    private void updateOrderingFactor(final TraversalWrapper<S, S> w) {
+        w.orderingFactor = ((w.findBranchFactor() - 1) / findCost(w));
+    }
+
+    @Override
+    public List<Traversal> getLocalChildren() {
+        return this.traversals;
+    }
+
+    /**
+     * A wrapper for a traversal in a query which maintains statistics about the traversal as
+     * it consumes inputs and produces outputs.
+     * The "branch factor" of the traversal is an important factor in determining its place in the query plan.
+     */
+    // note: input and output counts are never "refreshed".
+    // The position of a traversal in a query never changes, although its priority / likelihood of being executed does.
+    // Priority in turn affects branch factor.
+    // However, with sufficient inputs and optimizations,the branch factor is expected to converge on a stable value.
+    public static class TraversalWrapper<A, B> implements Comparable<TraversalWrapper<A, B>> {
+        private final Traversal<A, B> traversal;
+        private final String startLabel, endLabel;
+        private int totalInputs = 0;
+        private int totalOutputs = 0;
+        private double orderingFactor;
+
+        public TraversalWrapper(final Traversal<A, B> traversal,
+                                final String startLabel,
+                                final String endLabel) {
+            this.traversal = traversal;
+            this.startLabel = startLabel;
+            this.endLabel = endLabel;
+        }
+
+        public void incrementInputs() {
+            this.totalInputs++;
+        }
+
+        public void incrementOutputs(int outputs) {
+            this.totalOutputs += outputs;
+        }
+
+        // TODO: take variance into account, to avoid penalizing traversals for early encounters with super-inputs,
+        // or simply for never having been tried
+        public double findBranchFactor() {
+            return 0 == this.totalInputs ? 1 : this.totalOutputs / ((double) this.totalInputs);
+        }
+
+        @Override
+        public int compareTo(final TraversalWrapper<A, B> other) {
+            return ((Double) this.orderingFactor).compareTo(other.orderingFactor);
+        }
+
+        public Traversal<A, B> getTraversal() {
+            return this.traversal;
+        }
+
+        public void reset() {
+            this.traversal.asAdmin().reset();
+        }
+
+        @Override
+        public String toString() {
+            return this.traversal.toString();
+            //return "[" + this.startLabel + "->" + this.endLabel + "," + findBranchFactor() + ","
+            // + this.totalInputs + "," + this.totalOutputs + "," + this.traversal + "]";
+        }
+    }
+
+    /**
+     * A helper object which wraps a traversal, submitting starts and counting results per start
+     */
+    public static class TraversalUpdater<A, B> implements Iterator<B> {
+        private final TraversalWrapper<A, B> w;
+        private int outputs = -1;
+
+        public TraversalUpdater(final TraversalWrapper<A, B> w,
+                                final Iterator<A> inputs,
+                                final Traverser<A> start,
+                                final String as) {
+            this.w = w;
+
+            Iterator<A> seIter = new SideEffectIterator<>(inputs, ignored -> {
+                // only increment traversal input and output counts once an input
+                // has been completely processed by the traversal
+                if (-1 != outputs) {
+                    w.incrementInputs();
+                    w.incrementOutputs(outputs);
+                }
+                outputs = 0;
+            });
+            Iterator<Traverser<A>> starts = new MapIterator<>(seIter,
+                    o -> {
+                        final Traverser.Admin<A> traverser = ((Traverser.Admin<A>) start).split();
+                        traverser.set((A) o);
+                        return traverser;
+                    });
+
+            w.reset();
+
+            // with the traversal "empty" and ready for re-use, add new starts
+            w.traversal.asAdmin().addStarts(starts);
+        }
+
+        // note: may return true after first returning false (inheriting this behavior from e.g. DefaultTraversal)
+        @Override
+        public boolean hasNext() {
+            return w.traversal.hasNext();
+        }
+
+        @Override
+        public B next() {
+            outputs++;
+            B b = w.traversal.next();
+
+            // immediately check hasNext(), possibly updating the traverser's statistics
+            // even if we otherwise abandon the iterator
+            w.traversal.hasNext();
+
+            return b;
+        }
+    }
+
+    // an iterator which executes a side-effect the first time hasNext() is called before a next()
+    private static class SideEffectIterator<T> implements Iterator<T> {
+        private final Consumer onHasNext;
+        private final Iterator<T> baseIterator;
+        private boolean ready = true;
+
+        private SideEffectIterator(final Iterator<T> baseIterator,
+                                   final Consumer onHasNext) {
+            this.onHasNext = onHasNext;
+            this.baseIterator = baseIterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (this.ready) {
+                this.onHasNext.accept(null);
+                this.ready = false;
+            }
+            return this.baseIterator.hasNext();
+        }
+
+        @Override
+        public T next() {
+            T value = this.baseIterator.next();
+            this.ready = true;
+            return value;
+        }
+    }
+
+    private static class MapIterator<A, B> implements Iterator<B> {
+        private final Function<A, B> map;
+        private final Iterator<A> baseIterator;
+
+        public MapIterator(final Iterator<A> baseIterator, final Function<A, B> map) {
+            this.map = map;
+            this.baseIterator = baseIterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return this.baseIterator.hasNext();
+        }
+
+        @Override
+        public B next() {
+            return this.map.apply(this.baseIterator.next());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SerialEnumerator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SerialEnumerator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SerialEnumerator.java
new file mode 100644
index 0000000..42570a3
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SerialEnumerator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.map.match;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * An enumerator which consumes values from an iterator and maps each value to a secondary enumerator
+ * (for example, a join)
+ * Enumerated indices cover all solutions in the secondary enumerators,
+ * in ascending order according to the value iterator and the enumerators' own indices.
+ *
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public class SerialEnumerator<T> implements Enumerator<T> {
+    private final String name;
+    private Iterator<T> iterator;
+    private final Function<T, Enumerator<T>> constructor;
+    private final List<Enumerator<T>> memory = new ArrayList<>();
+    private final List<T> values = new ArrayList<>();
+
+    // TODO: this only assigned, not accessed until the efficient implementation of size() is restored
+    private int completedEnumsSize = 0;
+
+    public SerialEnumerator(final String name,
+                            final Iterator<T> iterator,
+                            final Function<T, Enumerator<T>> constructor) {
+        this.name = name;
+        this.iterator = iterator;
+        this.constructor = constructor;
+    }
+
+    public int size() {
+        // TODO: restore the more efficient implementation of size() while taking into account that
+        // traversal iterators such as DefaultTraversal may return hasNext=true after first returning hasNext=false
+        /*
+        int size = completedEnumsSize;
+        if (!sideEffects.isEmpty()) {
+            size += sideEffects.get(sideEffects.size() - 1).size();
+        }
+        return size;
+        */
+
+        //*
+        int size = 0;
+        for (Enumerator<T> e : memory) size += e.size();
+        return size;
+        //*/
+    }
+
+    // note: *not* intended for random access; use binary search if this is ever needed
+    public boolean visitSolution(final int index,
+                                 final BiConsumer<String, T> visitor) {
+        int totalSize = 0;
+        int memIndex = 0;
+        while (true) {
+            if (memIndex < memory.size()) {
+                Enumerator<T> e = memory.get(memIndex);
+
+                if (e.visitSolution(index - totalSize, visitor)) {
+                    // additionally, bind the value stored in this enumerator
+                    MatchStep.visit(name, values.get(memIndex), visitor);
+
+                    return true;
+                } else {
+                    totalSize += e.size();
+                    memIndex++;
+                }
+            } else {
+                if (null == iterator) {
+                    return false;
+                } else if (!iterator.hasNext()) {
+                    // free up memory as soon as possible
+                    iterator = null;
+                    return false;
+                }
+
+                if (!memory.isEmpty()) {
+                    int lastSize = memory.get(memIndex - 1).size();
+
+                    // first remove the head enumeration if it exists and is empty
+                    // (only the head will ever be empty, avoiding wasted space)
+                    if (0 == lastSize) {
+                        memIndex--;
+                        memory.remove(memIndex);
+                        values.remove(memIndex);
+                    } else {
+                        completedEnumsSize += lastSize;
+                    }
+                }
+
+                T value = iterator.next();
+                values.add(value);
+                Enumerator<T> e = constructor.apply(value);
+                memory.add(memory.size(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SimpleEnumerator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SimpleEnumerator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SimpleEnumerator.java
new file mode 100644
index 0000000..4f1dd8e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SimpleEnumerator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.map.match;
+
+import java.util.Iterator;
+import java.util.function.BiConsumer;
+
+/**
+ * An enumerator of at most one element
+ *
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public class SimpleEnumerator<T> implements Enumerator<T> {
+
+    private final String name;
+    private Iterator<T> iterator;
+    private T element;
+
+    public SimpleEnumerator(final String name,
+                            final Iterator<T> iterator) {
+        this.name = name;
+        this.iterator = iterator;
+    }
+
+    @Override
+    public int size() {
+        return null == element ? 0 : 1;
+    }
+
+    @Override
+    public boolean visitSolution(int index, BiConsumer<String, T> visitor) {
+        if (0 != index) {
+            return false;
+        }
+
+        if (null != iterator) {
+            if (iterator.hasNext()) {
+                element = iterator.next();
+            }
+            iterator = null;
+        }
+
+        if (null != element) {
+            MatchStep.visit(name, element, visitor);
+            return true;
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AddEdgeStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AddEdgeStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AddEdgeStep.java
new file mode 100644
index 0000000..68f7049
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AddEdgeStep.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.structure.Direction;
+import com.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class AddEdgeStep extends SideEffectStep<Vertex> implements Reversible {
+
+    private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+            TraverserRequirement.PATH,
+            TraverserRequirement.OBJECT
+    );
+
+    // TODO: Weight key based on Traverser.getCount() ?
+
+    private final Direction direction;
+    private final String edgeLabel;
+    private final String stepLabel;
+    private final Object[] propertyKeyValues;
+
+    public AddEdgeStep(final Traversal.Admin traversal, final Direction direction, final String edgeLabel, final String stepLabel, final Object... propertyKeyValues) {
+        super(traversal);
+        this.direction = direction;
+        this.edgeLabel = edgeLabel;
+        this.stepLabel = stepLabel;
+        this.propertyKeyValues = propertyKeyValues;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<Vertex> traverser) {
+        final Vertex currentVertex = traverser.get();
+        final Vertex otherVertex = traverser.path().get(stepLabel);
+        if (direction.equals(Direction.IN) || direction.equals(Direction.BOTH)) {
+            otherVertex.addEdge(edgeLabel, currentVertex, this.propertyKeyValues);
+        }
+        if (direction.equals(Direction.OUT) || direction.equals(Direction.BOTH)) {
+            currentVertex.addEdge(edgeLabel, otherVertex, this.propertyKeyValues);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.direction.name(), this.edgeLabel, this.stepLabel);
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java
new file mode 100644
index 0000000..7b289f9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.AggregateMapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.util.CollectingBarrierStep;
+import com.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.process.util.TraverserSet;
+import com.tinkerpop.gremlin.util.function.BulkSetSupplier;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class AggregateStep<S> extends CollectingBarrierStep<S> implements SideEffectRegistrar, SideEffectCapable, Reversible, TraversalParent, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+    private Traversal.Admin<S, Object> aggregateTraversal = new IdentityTraversal<>();
+    private String sideEffectKey;
+
+    public AggregateStep(final Traversal.Admin traversal, final String sideEffectKey) {
+        super(traversal);
+        this.sideEffectKey = sideEffectKey;
+    }
+
+    @Override
+    public void registerSideEffects() {
+        if (null == this.sideEffectKey) this.sideEffectKey = this.getId();
+        this.getTraversal().asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, BulkSetSupplier.instance());
+    }
+
+    @Override
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKey, this.aggregateTraversal);
+    }
+
+    @Override
+    public MapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> getMapReduce() {
+        return new AggregateMapReduce(this);
+    }
+
+    @Override
+    public void addLocalChild(final Traversal.Admin<?, ?> traversal) {
+        this.aggregateTraversal = this.integrateChild(traversal, TYPICAL_LOCAL_OPERATIONS);
+    }
+
+    @Override
+    public List<Traversal.Admin<S, Object>> getLocalChildren() {
+        return Collections.singletonList(this.aggregateTraversal);
+    }
+
+    @Override
+    public void barrierConsumer(final TraverserSet<S> traverserSet) {
+        traverserSet.forEach(traverser ->
+                TraversalHelper.addToCollection(
+                        traverser.getSideEffects().get(this.sideEffectKey),
+                        TraversalUtil.apply(traverser, this.aggregateTraversal),
+                        traverser.bulk()));
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements(TraverserRequirement.BULK, TraverserRequirement.SIDE_EFFECTS);
+    }
+
+    @Override
+    public AggregateStep<S> clone() throws CloneNotSupportedException {
+        final AggregateStep<S> clone = (AggregateStep<S>) super.clone();
+        clone.aggregateTraversal = this.integrateChild(this.aggregateTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+        return clone;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GraphStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GraphStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GraphStep.java
new file mode 100644
index 0000000..f698f3e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GraphStep.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.TraversalEngine;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.EngineDependent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.structure.Edge;
+import com.tinkerpop.gremlin.structure.Element;
+import com.tinkerpop.gremlin.structure.Graph;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.util.iterator.EmptyIterator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GraphStep<S extends Element> extends StartStep<S> implements EngineDependent {
+
+    protected final Class<S> returnClass;
+    protected final Object[] ids;
+    protected transient Graph graph;
+    protected transient Supplier<Iterator<S>> iteratorSupplier;
+
+    public GraphStep(final Traversal.Admin traversal, final Graph graph, final Class<S> returnClass, final Object... ids) {
+        super(traversal);
+        this.graph = graph;
+        this.returnClass = returnClass;
+        this.ids = ids;
+        this.iteratorSupplier = () -> (Iterator<S>) (Vertex.class.isAssignableFrom(this.returnClass) ?
+                this.graph.iterators().<S>vertexIterator(this.ids) :
+                this.graph.iterators().edgeIterator(this.ids));
+    }
+
+    public String toString() {
+        return TraversalHelper.makeStepString(this, Arrays.asList(this.ids), this.returnClass.getSimpleName().toLowerCase());
+    }
+
+    public boolean returnsVertices() {
+        return Vertex.class.isAssignableFrom(this.returnClass);
+    }
+
+    public boolean returnsEdges() {
+        return Edge.class.isAssignableFrom(this.returnClass);
+    }
+
+    public Class<S> getReturnClass() {
+        return this.returnClass;
+    }
+
+    public void setIteratorSupplier(final Supplier<Iterator<S>> iteratorSupplier) {
+        this.iteratorSupplier = iteratorSupplier;
+    }
+
+    public <G extends Graph> G getGraph(final Class<G> graphClass) {
+        return (G) this.graph;
+    }
+
+    public Object[] getIds() {
+        return this.ids;
+    }
+
+    @Override
+    public void onEngine(final TraversalEngine traversalEngine) {
+        if (traversalEngine.equals(TraversalEngine.COMPUTER)) {
+            this.iteratorSupplier = Collections::emptyIterator;
+        }
+    }
+
+    @Override
+    protected Traverser<S> processNextStart() {
+        if (this.first)
+            this.start = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
+        return super.processNextStart();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountStep.java
new file mode 100644
index 0000000..288f7c8
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountStep.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.GroupCountMapReduce;
+import com.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.process.util.MapHelper;
+import com.tinkerpop.gremlin.util.function.HashMapSupplier;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GroupCountStep<S> extends SideEffectStep<S> implements SideEffectRegistrar, SideEffectCapable, Reversible, TraversalParent, MapReducer<Object, Long, Object, Long, Map<Object, Long>> {
+
+    private Traversal.Admin<S, Object> groupTraversal = new IdentityTraversal<>();
+    private String sideEffectKey;
+    // TODO: onFirst like subgraph so we don't keep getting the map
+
+    public GroupCountStep(final Traversal.Admin traversal, final String sideEffectKey) {
+        super(traversal);
+        this.sideEffectKey = sideEffectKey;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        final Map<Object, Long> groupCountMap = traverser.sideEffects(this.sideEffectKey);
+        MapHelper.incr(groupCountMap, TraversalUtil.apply(traverser.asAdmin(), this.groupTraversal), traverser.bulk());
+    }
+
+    @Override
+    public void registerSideEffects() {
+        if (this.sideEffectKey == null) this.sideEffectKey = this.getId();
+        this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, HashMapSupplier.instance());
+    }
+
+    @Override
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    @Override
+    public MapReduce<Object, Long, Object, Long, Map<Object, Long>> getMapReduce() {
+        return new GroupCountMapReduce(this);
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKey, this.groupTraversal);
+    }
+
+    @Override
+    public void addLocalChild(final Traversal.Admin<?, ?> traversal) {
+        this.groupTraversal = this.integrateChild(traversal, TYPICAL_LOCAL_OPERATIONS);
+    }
+
+    @Override
+    public List<Traversal.Admin<S, Object>> getLocalChildren() {
+        return Collections.singletonList(this.groupTraversal);
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements(TraverserRequirement.BULK, TraverserRequirement.SIDE_EFFECTS);
+    }
+
+    @Override
+    public GroupCountStep<S> clone() throws CloneNotSupportedException {
+        final GroupCountStep<S> clone = (GroupCountStep<S>) super.clone();
+        clone.groupTraversal = this.integrateChild(this.groupTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+        return clone;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupStep.java
new file mode 100644
index 0000000..dfc7071
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupStep.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.TraversalEngine;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.GroupMapReduce;
+import com.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import com.tinkerpop.gremlin.process.traversal.step.EngineDependent;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.process.util.BulkSet;
+import com.tinkerpop.gremlin.util.function.HashMapSupplier;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GroupStep<S, K, V, R> extends SideEffectStep<S> implements SideEffectCapable, SideEffectRegistrar, TraversalParent, Reversible, EngineDependent, MapReducer<Object, Collection, Object, Object, Map> {
+
+    private char state = 'k';
+    private Traversal.Admin<S, K> keyTraversal = new IdentityTraversal<>();
+    private Traversal.Admin<S, V> valueTraversal = new IdentityTraversal<>();
+    private Traversal.Admin<Collection<V>, R> reduceTraversal = null;
+    private String sideEffectKey;
+    private boolean onGraphComputer = false;
+    private Map<K, Collection<V>> tempGroupByMap;
+
+    public GroupStep(final Traversal.Admin traversal, final String sideEffectKey) {
+        super(traversal);
+        this.sideEffectKey = sideEffectKey;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        final Map<K, Collection<V>> groupByMap = null == this.tempGroupByMap ? traverser.sideEffects(this.sideEffectKey) : this.tempGroupByMap; // for nested traversals and not !starts.hasNext()
+        doGroup(traverser.asAdmin(), groupByMap, this.keyTraversal, this.valueTraversal);
+        if (!this.onGraphComputer && null != this.reduceTraversal && !this.starts.hasNext()) {
+            this.tempGroupByMap = groupByMap;
+            final Map<K, R> reduceMap = new HashMap<>();
+            doReduce(groupByMap, reduceMap, this.reduceTraversal);
+            traverser.sideEffects(this.sideEffectKey, reduceMap);
+        }
+    }
+
+    @Override
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    @Override
+    public void registerSideEffects() {
+        if (this.sideEffectKey == null) this.sideEffectKey = this.getId();
+        this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, HashMapSupplier.instance());
+    }
+
+    private static <S, K, V> void doGroup(final Traverser.Admin<S> traverser, final Map<K, Collection<V>> groupMap, final Traversal.Admin<S, K> keyTraversal, final Traversal.Admin<S, V> valueTraversal) {
+        final K key = TraversalUtil.apply(traverser, keyTraversal);
+        final V value = TraversalUtil.apply(traverser, valueTraversal);
+        Collection<V> values = groupMap.get(key);
+        if (null == values) {
+            values = new BulkSet<>();
+            groupMap.put(key, values);
+        }
+        TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
+    }
+
+    private static <K, V, R> void doReduce(final Map<K, Collection<V>> groupMap, final Map<K, R> reduceMap, final Traversal.Admin<Collection<V>, R> reduceTraversal) {
+        groupMap.forEach((k, vv) -> reduceMap.put(k, TraversalUtil.apply(vv, reduceTraversal)));
+    }
+
+    @Override
+    public void onEngine(final TraversalEngine traversalEngine) {
+        this.onGraphComputer = traversalEngine.equals(TraversalEngine.COMPUTER);
+    }
+
+    @Override
+    public MapReduce<Object, Collection, Object, Object, Map> getMapReduce() {
+        return new GroupMapReduce(this);
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKey, this.keyTraversal, this.valueTraversal, this.reduceTraversal);
+    }
+
+    @Override
+    public <A, B> List<Traversal.Admin<A, B>> getLocalChildren() {
+        return null == this.reduceTraversal ? (List) Arrays.asList(this.keyTraversal, this.valueTraversal) : (List) Arrays.asList(this.keyTraversal, this.valueTraversal, this.reduceTraversal);
+    }
+
+    public Traversal.Admin<Collection<V>, R> getReduceTraversal() {
+        return this.reduceTraversal;
+    }
+
+    @Override
+    public void addLocalChild(final Traversal.Admin<?, ?> kvrTraversal) {
+        if ('k' == this.state) {
+            this.keyTraversal = this.integrateChild(kvrTraversal, TYPICAL_LOCAL_OPERATIONS);
+            this.state = 'v';
+        } else if ('v' == this.state) {
+            this.valueTraversal = this.integrateChild(kvrTraversal, TYPICAL_LOCAL_OPERATIONS);
+            this.state = 'r';
+        } else if ('r' == this.state) {
+            this.reduceTraversal = this.integrateChild(kvrTraversal, TYPICAL_LOCAL_OPERATIONS);
+            this.state = 'x';
+        } else {
+            throw new IllegalStateException("The key, value, and reduce functions for group()-step have already been set");
+        }
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements(TraverserRequirement.SIDE_EFFECTS, TraverserRequirement.BULK);
+    }
+
+    @Override
+    public GroupStep<S, K, V, R> clone() throws CloneNotSupportedException {
+        final GroupStep<S, K, V, R> clone = (GroupStep<S, K, V, R>) super.clone();
+        clone.keyTraversal = clone.integrateChild(this.keyTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+        clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+        if (null != this.reduceTraversal)
+            clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+        return clone;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/IdentityStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/IdentityStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/IdentityStep.java
new file mode 100644
index 0000000..1afa1cb
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/IdentityStep.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+
+import java.util.NoSuchElementException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class IdentityStep<S> extends AbstractStep<S, S> {
+
+    public IdentityStep(final Traversal.Admin traversal) {
+        super(traversal);
+    }
+
+    @Override
+    protected Traverser<S> processNextStart() throws NoSuchElementException {
+        return this.starts.next();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/InjectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/InjectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/InjectStep.java
new file mode 100644
index 0000000..4a03159
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/InjectStep.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.util.iterator.ArrayIterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InjectStep<S> extends StartStep<S> {
+
+    private final S[] injections;
+
+    @SafeVarargs
+    public InjectStep(final Traversal.Admin traversal, final S... injections) {
+        super(traversal);
+        this.injections = injections;
+        this.start = new ArrayIterator<>(this.injections);
+    }
+
+    @Override
+    public InjectStep<S> clone() throws CloneNotSupportedException {
+        final InjectStep<S> clone = (InjectStep<S>) super.clone();
+        clone.start = new ArrayIterator<>(clone.injections);
+        return clone;
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        this.start = new ArrayIterator<>(this.injections);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/LambdaSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/LambdaSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/LambdaSideEffectStep.java
new file mode 100644
index 0000000..b3f5a32
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/LambdaSideEffectStep.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+import java.util.function.Consumer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class LambdaSideEffectStep<S> extends SideEffectStep<S> {
+
+    private final Consumer<Traverser<S>> consumer;
+
+    public LambdaSideEffectStep(final Traversal.Admin traversal, final Consumer<Traverser<S>> consumer) {
+        super(traversal);
+        this.consumer = consumer;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        this.consumer.accept(traverser);
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.consumer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java
new file mode 100644
index 0000000..baee373
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Step;
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.TraversalEngine;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.ProfileMapReduce;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+import com.tinkerpop.gremlin.process.util.metric.StandardTraversalMetrics;
+import com.tinkerpop.gremlin.process.util.metric.TraversalMetrics;
+
+import java.util.NoSuchElementException;
+
+/**
+ * @author Bob Briody (http://bobbriody.com)
+ */
+public final class ProfileStep<S> extends AbstractStep<S, S> implements Reversible, MapReducer<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> {
+
+    private final String name;
+
+    public ProfileStep(final Traversal.Admin traversal) {
+        super(traversal);
+        this.name = null;
+    }
+
+    public ProfileStep(final Traversal.Admin traversal, final Step step) {
+        super(traversal);
+        this.name = step.toString();
+    }
+
+    @Override
+    public MapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> getMapReduce() {
+        return new ProfileMapReduce();
+    }
+
+    @Override
+    public Traverser<S> next() {
+        // Wrap SideEffectStep's next() with timer.
+        StandardTraversalMetrics traversalMetrics = getTraversalMetricsUtil();
+
+        Traverser<S> ret = null;
+        traversalMetrics.start(this.getId());
+        try {
+            ret = super.next();
+            return ret;
+        } finally {
+            if (ret != null)
+                traversalMetrics.finish(this.getId(), ret.asAdmin().bulk());
+            else
+                traversalMetrics.stop(this.getId());
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        // Wrap SideEffectStep's hasNext() with timer.
+        StandardTraversalMetrics traversalMetrics = getTraversalMetricsUtil();
+        traversalMetrics.start(this.getId());
+        boolean ret = super.hasNext();
+        traversalMetrics.stop(this.getId());
+        return ret;
+    }
+
+    @Override
+    protected Traverser<S> processNextStart() throws NoSuchElementException {
+        return this.starts.next();
+    }
+
+    private StandardTraversalMetrics getTraversalMetricsUtil() {
+        StandardTraversalMetrics traversalMetrics = this.getTraversal().asAdmin().getSideEffects().getOrCreate(TraversalMetrics.METRICS_KEY, StandardTraversalMetrics::new);
+        final boolean isComputer = this.traversal.asAdmin().getEngine().get().equals(TraversalEngine.COMPUTER);
+        traversalMetrics.initializeIfNecessary(this.getId(), this.traversal.asAdmin().getSteps().indexOf(this), name, isComputer);
+        return traversalMetrics;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackElementValueStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackElementValueStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackElementValueStep.java
new file mode 100644
index 0000000..c259367
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackElementValueStep.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.structure.Element;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SackElementValueStep<S extends Element, V> extends SideEffectStep<S> {
+
+    private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+            TraverserRequirement.SACK,
+            TraverserRequirement.OBJECT
+    );
+
+    private BinaryOperator<V> operator;
+    private final String propertyKey;
+
+    public SackElementValueStep(final Traversal.Admin traversal, final BinaryOperator<V> operator, final String propertyKey) {
+        super(traversal);
+        this.operator = operator;
+        this.propertyKey = propertyKey;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        traverser.get().iterators().valueIterator(this.propertyKey).forEachRemaining(value -> {
+            traverser.sack(this.operator.apply(traverser.sack(), (V) value));
+        });
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.operator, this.propertyKey);
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackObjectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackObjectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackObjectStep.java
new file mode 100644
index 0000000..1f664a8
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackObjectStep.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SackObjectStep<S, V> extends SideEffectStep<S> {
+
+    private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+            TraverserRequirement.SACK,
+            TraverserRequirement.OBJECT
+    );
+
+    private final BiFunction<V, S, V> operator;
+
+    public SackObjectStep(final Traversal.Admin traversal, final BiFunction<V, S, V> operator) {
+        super(traversal);
+        this.operator = operator;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        traverser.sack(this.operator.apply(traverser.sack(), traverser.get()));
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectCapStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectCapStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectCapStep.java
new file mode 100644
index 0000000..de42a67
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectCapStep.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
+import com.tinkerpop.gremlin.process.graph.traversal.step.util.SupplyingBarrierStep;
+import com.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+
+import java.util.*;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectCapStep<S, E> extends SupplyingBarrierStep<S, E> implements SideEffectRegistrar {
+
+    private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+            TraverserRequirement.SIDE_EFFECTS,
+            TraverserRequirement.OBJECT
+    );
+
+    private List<String> sideEffectKeys;
+
+    public SideEffectCapStep(final Traversal.Admin traversal, final String... sideEffectKeys) {
+        super(traversal);
+        this.sideEffectKeys = Arrays.asList(sideEffectKeys);
+    }
+
+    public void registerSideEffects() {
+        if (this.sideEffectKeys.isEmpty())
+            this.sideEffectKeys = Collections.singletonList(((SideEffectCapable) this.getPreviousStep()).getSideEffectKey());
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKeys);
+    }
+
+    public List<String> getSideEffectKeys() {
+        return this.sideEffectKeys;
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
+    }
+
+    @Override
+    public E supply() {
+        return this.sideEffectKeys.size() == 1 ?
+                this.getTraversal().asAdmin().getSideEffects().get(this.sideEffectKeys.get(0)) :
+                (E) this.getMapOfSideEffects();
+    }
+
+    public Map<String, Object> getMapOfSideEffects() {
+        final Map<String, Object> sideEffects = new HashMap<>();
+        for (final String sideEffectKey : this.sideEffectKeys) {
+            sideEffects.put(sideEffectKey, this.getTraversal().asAdmin().getSideEffects().get(sideEffectKey));
+        }
+        return sideEffects;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectStep.java
new file mode 100644
index 0000000..7bc47b3
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectStep.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class SideEffectStep<S> extends AbstractStep<S, S> implements Reversible {
+
+    public SideEffectStep(final Traversal.Admin traversal) {
+        super(traversal);
+    }
+
+    protected abstract void sideEffect(final Traverser.Admin<S> traverser);
+
+    @Override
+    protected Traverser<S> processNextStart() {
+        final Traverser.Admin<S> traverser = this.starts.next();
+        this.sideEffect(traverser);
+        return traverser;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StartStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StartStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StartStep.java
new file mode 100644
index 0000000..c9bc624
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StartStep.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class StartStep<S> extends AbstractStep<S, S> implements Reversible {
+
+    protected Object start;
+    protected boolean first = true;
+
+    public StartStep(final Traversal.Admin traversal, final Object start) {
+        super(traversal);
+        this.start = start;
+    }
+
+    public StartStep(final Traversal.Admin traversal) {
+        this(traversal, null);
+    }
+
+    public <T> T getStart() {
+        return (T) this.start;
+    }
+
+    public boolean startAssignableTo(final Class... assignableClasses) {
+        return Stream.of(assignableClasses).filter(check -> check.isAssignableFrom(this.start.getClass())).findAny().isPresent();
+    }
+
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.start);
+    }
+
+    @Override
+    protected Traverser<S> processNextStart() {
+        if (this.first) {
+            if (null != this.start) {
+                if (this.start instanceof Iterator)
+                    this.starts.add(this.getTraversal().getTraverserGenerator().generateIterator((Iterator<S>) this.start, this, 1l));
+                else
+                    this.starts.add(this.getTraversal().getTraverserGenerator().generate((S) this.start, this, 1l));
+            }
+            this.first = false;
+        }
+        return this.starts.next();
+    }
+
+    @Override
+    public StartStep<S> clone() throws CloneNotSupportedException {
+        final StartStep<S> clone = (StartStep<S>) super.clone();
+        clone.first = true;
+        clone.start = null;
+        return clone;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java
new file mode 100644
index 0000000..e10be8a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.StoreMapReduce;
+import com.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.util.function.BulkSetSupplier;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectCapable, SideEffectRegistrar, Reversible, TraversalParent, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+    private Traversal.Admin<S, Object> storeTraversal = new IdentityTraversal<>();
+    private String sideEffectKey;
+
+    public StoreStep(final Traversal.Admin traversal, final String sideEffectKey) {
+        super(traversal);
+        this.sideEffectKey = sideEffectKey;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        TraversalHelper.addToCollection(
+                traverser.sideEffects(this.sideEffectKey),
+                TraversalUtil.apply(traverser.asAdmin(), this.storeTraversal),
+                traverser.bulk());
+    }
+
+    @Override
+    public void registerSideEffects() {
+        if (null == this.sideEffectKey) this.sideEffectKey = this.getId();
+        this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, BulkSetSupplier.instance());
+    }
+
+    @Override
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKey, this.storeTraversal);
+    }
+
+    @Override
+    public MapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> getMapReduce() {
+        return new StoreMapReduce(this);
+    }
+
+    @Override
+    public List<Traversal.Admin<S, Object>> getLocalChildren() {
+        return Collections.singletonList(this.storeTraversal);
+    }
+
+    @Override
+    public void addLocalChild(final Traversal.Admin<?, ?> storeTraversal) {
+        this.storeTraversal = this.integrateChild(storeTraversal, TYPICAL_LOCAL_OPERATIONS);
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements(TraverserRequirement.SIDE_EFFECTS, TraverserRequirement.BULK);
+    }
+
+    @Override
+    public StoreStep<S> clone() throws CloneNotSupportedException {
+        final StoreStep<S> clone = (StoreStep<S>) super.clone();
+        clone.storeTraversal = clone.integrateChild(this.storeTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+        return clone;
+    }
+}