You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/19 19:26:43 UTC

[25/40] incubator-tinkerpop git commit: the traversal steps provided by TinkerPop are the foundation for all dsl. GraphTraversal is just a dsl of traversal. Refactored the process API to reflect this concept. Fixed #592.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SerialEnumerator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SerialEnumerator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SerialEnumerator.java
new file mode 100644
index 0000000..e4f8bef
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SerialEnumerator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map.match;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * An enumerator which consumes values from an iterator and maps each value to a secondary enumerator
+ * (for example, a join)
+ * Enumerated indices cover all solutions in the secondary enumerators,
+ * in ascending order according to the value iterator and the enumerators' own indices.
+ *
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public class SerialEnumerator<T> implements Enumerator<T> {
+    private final String name;
+    private Iterator<T> iterator;
+    private final Function<T, Enumerator<T>> constructor;
+    private final List<Enumerator<T>> memory = new ArrayList<>();
+    private final List<T> values = new ArrayList<>();
+
+    // TODO: this only assigned, not accessed until the efficient implementation of size() is restored
+    private int completedEnumsSize = 0;
+
+    public SerialEnumerator(final String name,
+                            final Iterator<T> iterator,
+                            final Function<T, Enumerator<T>> constructor) {
+        this.name = name;
+        this.iterator = iterator;
+        this.constructor = constructor;
+    }
+
+    public int size() {
+        // TODO: restore the more efficient implementation of size() while taking into account that
+        // traversal iterators such as DefaultTraversal may return hasNext=true after first returning hasNext=false
+        /*
+        int size = completedEnumsSize;
+        if (!sideEffects.isEmpty()) {
+            size += sideEffects.get(sideEffects.size() - 1).size();
+        }
+        return size;
+        */
+
+        //*
+        int size = 0;
+        for (Enumerator<T> e : memory) size += e.size();
+        return size;
+        //*/
+    }
+
+    // note: *not* intended for random access; use binary search if this is ever needed
+    public boolean visitSolution(final int index,
+                                 final BiConsumer<String, T> visitor) {
+        int totalSize = 0;
+        int memIndex = 0;
+        while (true) {
+            if (memIndex < memory.size()) {
+                Enumerator<T> e = memory.get(memIndex);
+
+                if (e.visitSolution(index - totalSize, visitor)) {
+                    // additionally, bind the value stored in this enumerator
+                    MatchStep.visit(name, values.get(memIndex), visitor);
+
+                    return true;
+                } else {
+                    totalSize += e.size();
+                    memIndex++;
+                }
+            } else {
+                if (null == iterator) {
+                    return false;
+                } else if (!iterator.hasNext()) {
+                    // free up memory as soon as possible
+                    iterator = null;
+                    return false;
+                }
+
+                if (!memory.isEmpty()) {
+                    int lastSize = memory.get(memIndex - 1).size();
+
+                    // first remove the head enumeration if it exists and is empty
+                    // (only the head will ever be empty, avoiding wasted space)
+                    if (0 == lastSize) {
+                        memIndex--;
+                        memory.remove(memIndex);
+                        values.remove(memIndex);
+                    } else {
+                        completedEnumsSize += lastSize;
+                    }
+                }
+
+                T value = iterator.next();
+                values.add(value);
+                Enumerator<T> e = constructor.apply(value);
+                memory.add(memory.size(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SimpleEnumerator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SimpleEnumerator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SimpleEnumerator.java
new file mode 100644
index 0000000..eb5da9c
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SimpleEnumerator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map.match;
+
+import java.util.Iterator;
+import java.util.function.BiConsumer;
+
+/**
+ * An enumerator of at most one element
+ *
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public class SimpleEnumerator<T> implements Enumerator<T> {
+
+    private final String name;
+    private Iterator<T> iterator;
+    private T element;
+
+    public SimpleEnumerator(final String name,
+                            final Iterator<T> iterator) {
+        this.name = name;
+        this.iterator = iterator;
+    }
+
+    @Override
+    public int size() {
+        return null == element ? 0 : 1;
+    }
+
+    @Override
+    public boolean visitSolution(int index, BiConsumer<String, T> visitor) {
+        if (0 != index) {
+            return false;
+        }
+
+        if (null != iterator) {
+            if (iterator.hasNext()) {
+                element = iterator.next();
+            }
+            iterator = null;
+        }
+
+        if (null != element) {
+            MatchStep.visit(name, element, visitor);
+            return true;
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AddPropertyStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AddPropertyStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AddPropertyStep.java
new file mode 100644
index 0000000..f79ed92
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AddPropertyStep.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.structure.Element;
+
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class AddPropertyStep<S extends Element> extends SideEffectStep<S> {
+
+    private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.OBJECT);
+
+    private final String key;
+    private final Object value;
+
+    public AddPropertyStep(final Traversal.Admin traversal, final String key, final Object value) {
+        super(traversal);
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        traverser.get().property(this.key, this.value);
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AggregateStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AggregateStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AggregateStep.java
new file mode 100644
index 0000000..502037e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AggregateStep.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.function.BulkSetSupplier;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class AggregateStep<S> extends CollectingBarrierStep<S> implements SideEffectCapable, TraversalParent, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+    private Traversal.Admin<S, Object> aggregateTraversal = null;
+    private String sideEffectKey;
+
+    public AggregateStep(final Traversal.Admin traversal, final String sideEffectKey) {
+        super(traversal);
+        this.sideEffectKey = sideEffectKey;
+        this.getTraversal().asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, BulkSetSupplier.instance());
+    }
+
+    @Override
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKey, this.aggregateTraversal);
+    }
+
+    @Override
+    public MapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> getMapReduce() {
+        return new AggregateMapReduce(this);
+    }
+
+    @Override
+    public void addLocalChild(final Traversal.Admin<?, ?> aggregateTraversal) {
+        this.aggregateTraversal = this.integrateChild(aggregateTraversal);
+    }
+
+    @Override
+    public List<Traversal.Admin<S, Object>> getLocalChildren() {
+        return null == this.aggregateTraversal ? Collections.emptyList() : Collections.singletonList(this.aggregateTraversal);
+    }
+
+    @Override
+    public void barrierConsumer(final TraverserSet<S> traverserSet) {
+        traverserSet.forEach(traverser ->
+                TraversalHelper.addToCollection(
+                        traverser.getSideEffects().get(this.sideEffectKey),
+                        TraversalUtil.applyNullable(traverser, this.aggregateTraversal),
+                        traverser.bulk()));
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements(TraverserRequirement.BULK, TraverserRequirement.SIDE_EFFECTS);
+    }
+
+    @Override
+    public AggregateStep<S> clone() {
+        final AggregateStep<S> clone = (AggregateStep<S>) super.clone();
+        if (null != this.aggregateTraversal)
+            clone.aggregateTraversal = clone.integrateChild(this.aggregateTraversal.clone());
+        return clone;
+    }
+
+    ////////
+
+    public static final class AggregateMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+        public static final String AGGREGATE_STEP_SIDE_EFFECT_KEY = "gremlin.aggregateStep.sideEffectKey";
+
+        private String sideEffectKey;
+        private Supplier<Collection> collectionSupplier;
+
+        private AggregateMapReduce() {
+
+        }
+
+        public AggregateMapReduce(final AggregateStep step) {
+            this.sideEffectKey = step.getSideEffectKey();
+            this.collectionSupplier = step.getTraversal().asAdmin().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+        }
+
+        @Override
+        public void storeState(final Configuration configuration) {
+            super.storeState(configuration);
+            configuration.setProperty(AGGREGATE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+        }
+
+        @Override
+        public void loadState(final Configuration configuration) {
+            this.sideEffectKey = configuration.getString(AGGREGATE_STEP_SIDE_EFFECT_KEY);
+            this.collectionSupplier = TraversalVertexProgram.getTraversalSupplier(configuration).get().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+        }
+
+        @Override
+        public boolean doStage(final Stage stage) {
+            return stage.equals(Stage.MAP);
+        }
+
+        @Override
+        public void map(final Vertex vertex, final MapEmitter<NullObject, Object> emitter) {
+            VertexTraversalSideEffects.of(vertex).<Collection<?>>orElse(this.sideEffectKey, Collections.emptyList()).forEach(emitter::emit);
+        }
+
+        @Override
+        public Collection generateFinalResult(final Iterator<KeyValue<NullObject, Object>> keyValues) {
+            final Collection collection = this.collectionSupplier.get();
+            keyValues.forEachRemaining(keyValue -> collection.add(keyValue.getValue()));
+            return collection;
+        }
+
+        @Override
+        public String getMemoryKey() {
+            return this.sideEffectKey;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GraphStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GraphStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GraphStep.java
new file mode 100644
index 0000000..1b6b514
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GraphStep.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.EngineDependent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GraphStep<S extends Element> extends StartStep<S> implements EngineDependent {
+
+    protected final Class<S> returnClass;
+    protected Object[] ids;
+    protected transient Supplier<Iterator<S>> iteratorSupplier;
+
+    public GraphStep(final Traversal.Admin traversal, final Class<S> returnClass, final Object... ids) {
+        super(traversal);
+        this.returnClass = returnClass;
+        this.ids = ids;
+        for (int i = 0; i < this.ids.length; i++) {
+            if (this.ids[i] instanceof Element)
+                this.ids[i] = ((Element) this.ids[i]).id();
+        }
+        this.iteratorSupplier = () -> (Iterator<S>) (Vertex.class.isAssignableFrom(this.returnClass) ?
+                this.getTraversal().getGraph().get().vertices(this.ids) :
+                this.getTraversal().getGraph().get().edges(this.ids));
+    }
+
+    public String toString() {
+        return TraversalHelper.makeStepString(this, Arrays.toString(this.ids), this.returnClass.getSimpleName().toLowerCase());
+    }
+
+    public boolean returnsVertices() {
+        return Vertex.class.isAssignableFrom(this.returnClass);
+    }
+
+    public boolean returnsEdges() {
+        return Edge.class.isAssignableFrom(this.returnClass);
+    }
+
+    public Class<S> getReturnClass() {
+        return this.returnClass;
+    }
+
+    public void setIteratorSupplier(final Supplier<Iterator<S>> iteratorSupplier) {
+        this.iteratorSupplier = iteratorSupplier;
+    }
+
+    public Object[] getIds() {
+        return this.ids;
+    }
+
+    public void clearIds() {
+        this.ids = new Object[0];
+    }
+
+    @Override
+    public void onEngine(final TraversalEngine traversalEngine) {
+        if (traversalEngine.isComputer()) {
+            this.iteratorSupplier = Collections::emptyIterator;
+        }
+    }
+
+    @Override
+    protected Traverser<S> processNextStart() {
+        if (this.first)
+            this.start = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
+        return super.processNextStart();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountSideEffectStep.java
new file mode 100644
index 0000000..5a80c4f
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountSideEffectStep.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.MapHelper;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GroupCountSideEffectStep<S, E> extends SideEffectStep<S> implements SideEffectCapable, TraversalParent, MapReducer<E, Long, E, Long, Map<E, Long>> {
+
+    private Traversal.Admin<S, E> groupTraversal = null;
+    private String sideEffectKey;
+
+    public GroupCountSideEffectStep(final Traversal.Admin traversal, final String sideEffectKey) {
+        super(traversal);
+        this.sideEffectKey = sideEffectKey;
+        this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, HashMapSupplier.instance());
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        final Map<Object, Long> groupCountMap = traverser.sideEffects(this.sideEffectKey);
+        MapHelper.incr(groupCountMap, TraversalUtil.applyNullable(traverser.asAdmin(), this.groupTraversal), traverser.bulk());
+    }
+
+    @Override
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    @Override
+    public MapReduce<E, Long, E, Long, Map<E, Long>> getMapReduce() {
+        return new GroupCountSideEffectMapReduce<>(this);
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKey, this.groupTraversal);
+    }
+
+    @Override
+    public void addLocalChild(final Traversal.Admin<?, ?> groupTraversal) {
+        this.groupTraversal = this.integrateChild(groupTraversal);
+    }
+
+    @Override
+    public List<Traversal.Admin<S, E>> getLocalChildren() {
+        return null == this.groupTraversal ? Collections.emptyList() : Collections.singletonList(this.groupTraversal);
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements(TraverserRequirement.BULK, TraverserRequirement.SIDE_EFFECTS);
+    }
+
+    @Override
+    public GroupCountSideEffectStep<S, E> clone() {
+        final GroupCountSideEffectStep<S, E> clone = (GroupCountSideEffectStep<S, E>) super.clone();
+        if (null != this.groupTraversal)
+            clone.groupTraversal = clone.integrateChild(this.groupTraversal.clone());
+        return clone;
+    }
+
+    ///////
+
+    public static final class GroupCountSideEffectMapReduce<E> extends StaticMapReduce<E, Long, E, Long, Map<E, Long>> {
+
+        public static final String GROUP_COUNT_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY = "gremlin.groupCountSideEffectStep.sideEffectKey";
+
+        private String sideEffectKey;
+        private Supplier<Map<E, Long>> mapSupplier;
+
+        private GroupCountSideEffectMapReduce() {
+
+        }
+
+        public GroupCountSideEffectMapReduce(final GroupCountSideEffectStep step) {
+            this.sideEffectKey = step.getSideEffectKey();
+            this.mapSupplier = step.getTraversal().asAdmin().getSideEffects().<Map<E, Long>>getRegisteredSupplier(this.sideEffectKey).orElse(HashMap::new);
+        }
+
+        @Override
+        public void storeState(final Configuration configuration) {
+            super.storeState(configuration);
+            configuration.setProperty(GROUP_COUNT_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+        }
+
+        @Override
+        public void loadState(final Configuration configuration) {
+            this.sideEffectKey = configuration.getString(GROUP_COUNT_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY);
+            this.mapSupplier = TraversalVertexProgram.getTraversalSupplier(configuration).get().getSideEffects().<Map<E, Long>>getRegisteredSupplier(this.sideEffectKey).orElse(HashMap::new);
+        }
+
+        @Override
+        public boolean doStage(final Stage stage) {
+            return true;
+        }
+
+        @Override
+        public void map(final Vertex vertex, final MapEmitter<E, Long> emitter) {
+            VertexTraversalSideEffects.of(vertex).<Map<E, Number>>orElse(this.sideEffectKey, Collections.emptyMap()).forEach((k, v) -> emitter.emit(k, v.longValue()));
+        }
+
+        @Override
+        public void reduce(final E key, final Iterator<Long> values, final ReduceEmitter<E, Long> emitter) {
+            long counter = 0;
+            while (values.hasNext()) {
+                counter = counter + values.next();
+            }
+            emitter.emit(key, counter);
+        }
+
+        @Override
+        public void combine(final E key, final Iterator<Long> values, final ReduceEmitter<E, Long> emitter) {
+            reduce(key, values, emitter);
+        }
+
+        @Override
+        public Map<E, Long> generateFinalResult(final Iterator<KeyValue<E, Long>> keyValues) {
+            final Map<E, Long> map = this.mapSupplier.get();
+            keyValues.forEachRemaining(keyValue -> map.put(keyValue.getKey(), keyValue.getValue()));
+            return map;
+        }
+
+        @Override
+        public String getMemoryKey() {
+            return this.sideEffectKey;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
new file mode 100644
index 0000000..d344953
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.process.traversal.step.EngineDependent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GroupSideEffectStep<S, K, V, R> extends SideEffectStep<S> implements SideEffectCapable, TraversalParent, EngineDependent, MapReducer<K, Collection<V>, K, R, Map<K, R>> {
+
+    private char state = 'k';
+    private Traversal.Admin<S, K> keyTraversal = null;
+    private Traversal.Admin<S, V> valueTraversal = null;
+    private Traversal.Admin<Collection<V>, R> reduceTraversal = null;
+    private String sideEffectKey;
+    private boolean onGraphComputer = false;
+    private Map<K, Collection<V>> tempGroupByMap;
+
+    public GroupSideEffectStep(final Traversal.Admin traversal, final String sideEffectKey) {
+        super(traversal);
+        this.sideEffectKey = sideEffectKey;
+        this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, HashMapSupplier.instance());
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        final Map<K, Collection<V>> groupMap = null == this.tempGroupByMap ? traverser.sideEffects(this.sideEffectKey) : this.tempGroupByMap; // for nested traversals and not !starts.hasNext()
+        final K key = TraversalUtil.applyNullable(traverser, keyTraversal);
+        final V value = TraversalUtil.applyNullable(traverser, valueTraversal);
+        Collection<V> values = groupMap.get(key);
+        if (null == values) {
+            values = new BulkSet<>();
+            groupMap.put(key, values);
+        }
+        TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
+        //////// reducer for OLTP
+        if (!this.onGraphComputer && null != this.reduceTraversal && !this.starts.hasNext()) {
+            this.tempGroupByMap = groupMap;
+            final Map<K, R> reduceMap = new HashMap<>();
+            groupMap.forEach((k, vv) -> reduceMap.put(k, TraversalUtil.applyNullable(vv, this.reduceTraversal)));
+            traverser.sideEffects(this.sideEffectKey, reduceMap);
+        }
+    }
+
+    @Override
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    @Override
+    public void onEngine(final TraversalEngine traversalEngine) {
+        this.onGraphComputer = traversalEngine.isComputer();
+    }
+
+    @Override
+    public MapReduce<K, Collection<V>, K, R, Map<K, R>> getMapReduce() {
+        return new GroupSideEffectMapReduce<>(this);
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKey, this.keyTraversal, this.valueTraversal, this.reduceTraversal);
+    }
+
+    @Override
+    public <A, B> List<Traversal.Admin<A, B>> getLocalChildren() {
+        final List<Traversal.Admin<A, B>> children = new ArrayList<>(3);
+        if (null != this.keyTraversal)
+            children.add((Traversal.Admin) this.keyTraversal);
+        if (null != this.valueTraversal)
+            children.add((Traversal.Admin) this.valueTraversal);
+        if (null != this.reduceTraversal)
+            children.add((Traversal.Admin) this.reduceTraversal);
+        return children;
+    }
+
+    public Traversal.Admin<Collection<V>, R> getReduceTraversal() {
+        return this.reduceTraversal;
+    }
+
+    @Override
+    public void addLocalChild(final Traversal.Admin<?, ?> kvrTraversal) {
+        if ('k' == this.state) {
+            this.keyTraversal = this.integrateChild(kvrTraversal);
+            this.state = 'v';
+        } else if ('v' == this.state) {
+            this.valueTraversal = this.integrateChild(kvrTraversal);
+            this.state = 'r';
+        } else if ('r' == this.state) {
+            this.reduceTraversal = this.integrateChild(kvrTraversal);
+            this.state = 'x';
+        } else {
+            throw new IllegalStateException("The key, value, and reduce functions for group()-step have already been set");
+        }
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements(TraverserRequirement.SIDE_EFFECTS, TraverserRequirement.BULK);
+    }
+
+    @Override
+    public GroupSideEffectStep<S, K, V, R> clone() {
+        final GroupSideEffectStep<S, K, V, R> clone = (GroupSideEffectStep<S, K, V, R>) super.clone();
+        if (null != this.keyTraversal)
+            clone.keyTraversal = clone.integrateChild(this.keyTraversal.clone());
+        if (null != this.valueTraversal)
+            clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
+        if (null != this.reduceTraversal)
+            clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
+        return clone;
+    }
+
+    ///////////
+
+    public static final class GroupSideEffectMapReduce<K, V, R> implements MapReduce<K, Collection<V>, K, R, Map<K, R>> {
+
+        public static final String GROUP_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY = "gremlin.groupSideEffectStep.sideEffectKey";
+        public static final String GROUP_SIDE_EFFECT_STEP_STEP_ID = "gremlin.groupSideEffectStep.stepId";
+
+        private String sideEffectKey;
+        private String groupStepId;
+        private Traversal.Admin<Collection<V>, R> reduceTraversal;
+        private Supplier<Map<K, R>> mapSupplier;
+
+        private GroupSideEffectMapReduce() {
+
+        }
+
+        public GroupSideEffectMapReduce(final GroupSideEffectStep step) {
+            this.groupStepId = step.getId();
+            this.sideEffectKey = step.getSideEffectKey();
+            this.reduceTraversal = step.getReduceTraversal();
+            this.mapSupplier = step.getTraversal().asAdmin().getSideEffects().<Map<K, R>>getRegisteredSupplier(this.sideEffectKey).orElse(HashMap::new);
+        }
+
+        @Override
+        public void storeState(final Configuration configuration) {
+            MapReduce.super.storeState(configuration);
+            configuration.setProperty(GROUP_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+            configuration.setProperty(GROUP_SIDE_EFFECT_STEP_STEP_ID, this.groupStepId);
+        }
+
+        @Override
+        public void loadState(final Configuration configuration) {
+            this.sideEffectKey = configuration.getString(GROUP_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY);
+            this.groupStepId = configuration.getString(GROUP_SIDE_EFFECT_STEP_STEP_ID);
+            final Traversal.Admin<?, ?> traversal = TraversalVertexProgram.getTraversalSupplier(configuration).get();
+            if (!traversal.isLocked())
+                traversal.applyStrategies(); // TODO: this is a scary error prone requirement, but only a problem for GroupStep
+            final GroupSideEffectStep groupSideEffectStep = new TraversalMatrix<>(traversal).getStepById(this.groupStepId);
+            this.reduceTraversal = groupSideEffectStep.getReduceTraversal();
+            this.mapSupplier = traversal.getSideEffects().<Map<K, R>>getRegisteredSupplier(this.sideEffectKey).orElse(HashMap::new);
+        }
+
+        @Override
+        public boolean doStage(final Stage stage) {
+            return !stage.equals(Stage.COMBINE);
+        }
+
+        @Override
+        public void map(final Vertex vertex, final MapEmitter<K, Collection<V>> emitter) {
+            VertexTraversalSideEffects.of(vertex).<Map<K, Collection<V>>>orElse(this.sideEffectKey, Collections.emptyMap()).forEach(emitter::emit);
+        }
+
+        @Override
+        public void reduce(final K key, final Iterator<Collection<V>> values, final ReduceEmitter<K, R> emitter) {
+            final Set<V> set = new BulkSet<>();
+            values.forEachRemaining(set::addAll);
+            emitter.emit(key, TraversalUtil.applyNullable(set, this.reduceTraversal));
+        }
+
+        @Override
+        public Map<K, R> generateFinalResult(final Iterator<KeyValue<K, R>> keyValues) {
+            final Map<K, R> map = this.mapSupplier.get();
+            keyValues.forEachRemaining(keyValue -> map.put(keyValue.getKey(), keyValue.getValue()));
+            return map;
+        }
+
+        @Override
+        public String getMemoryKey() {
+            return this.sideEffectKey;
+        }
+
+        @Override
+        public GroupSideEffectMapReduce<K, V, R> clone() {
+            try {
+                final GroupSideEffectMapReduce<K, V, R> clone = (GroupSideEffectMapReduce<K, V, R>) super.clone();
+                if (null != clone.reduceTraversal)
+                    clone.reduceTraversal = this.reduceTraversal.clone();
+                return clone;
+            } catch (final CloneNotSupportedException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return StringFactory.mapReduceString(this, this.getMemoryKey());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/IdentityStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/IdentityStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/IdentityStep.java
new file mode 100644
index 0000000..269f2c9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/IdentityStep.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+
+import java.util.NoSuchElementException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class IdentityStep<S> extends AbstractStep<S, S> {
+
+    public IdentityStep(final Traversal.Admin traversal) {
+        super(traversal);
+    }
+
+    @Override
+    protected Traverser<S> processNextStart() throws NoSuchElementException {
+        return this.starts.next();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
new file mode 100644
index 0000000..34b5eab
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.util.iterator.ArrayIterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InjectStep<S> extends StartStep<S> {
+
+    private final S[] injections;
+
+    @SafeVarargs
+    public InjectStep(final Traversal.Admin traversal, final S... injections) {
+        super(traversal);
+        this.injections = injections;
+        this.start = new ArrayIterator<>(this.injections);
+    }
+
+    @Override
+    public InjectStep<S> clone() {
+        final InjectStep<S> clone = (InjectStep<S>) super.clone();
+        clone.start = new ArrayIterator<>(clone.injections);
+        return clone;
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        this.start = new ArrayIterator<>(this.injections);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/LambdaSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/LambdaSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/LambdaSideEffectStep.java
new file mode 100644
index 0000000..2050489
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/LambdaSideEffectStep.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+import java.util.function.Consumer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class LambdaSideEffectStep<S> extends SideEffectStep<S> {
+
+    private final Consumer<Traverser<S>> consumer;
+
+    public LambdaSideEffectStep(final Traversal.Admin traversal, final Consumer<Traverser<S>> consumer) {
+        super(traversal);
+        this.consumer = consumer;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        this.consumer.accept(traverser);
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.consumer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileStep.java
new file mode 100644
index 0000000..4be3483
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileStep.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.DependantMutableMetrics;
+import org.apache.tinkerpop.gremlin.process.traversal.util.MutableMetrics;
+import org.apache.tinkerpop.gremlin.process.traversal.util.StandardTraversalMetrics;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMetrics;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * @author Bob Briody (http://bobbriody.com)
+ */
+public final class ProfileStep<S> extends AbstractStep<S, S> implements MapReducer<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> {
+
+    // Stored in the Traversal sideEffects but kept here as a reference for convenience.
+    private StandardTraversalMetrics traversalMetrics;
+
+    public ProfileStep(final Traversal.Admin traversal) {
+        super(traversal);
+    }
+
+
+    @Override
+    public MapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> getMapReduce() {
+        return ProfileMapReduce.instance();
+    }
+
+    @Override
+    public Traverser<S> next() {
+        Traverser<S> ret = null;
+        initializeIfNeeded();
+        traversalMetrics.start(this.getId());
+        try {
+            ret = super.next();
+            return ret;
+        } finally {
+            if (ret != null) {
+                traversalMetrics.finish(this.getId(), ret.asAdmin().bulk());
+            } else {
+                traversalMetrics.stop(this.getId());
+            }
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        initializeIfNeeded();
+        traversalMetrics.start(this.getId());
+        boolean ret = super.hasNext();
+        traversalMetrics.stop(this.getId());
+        return ret;
+    }
+
+    @Override
+    protected Traverser<S> processNextStart() throws NoSuchElementException {
+        return this.starts.next();
+    }
+
+    private void initializeIfNeeded() {
+        if (traversalMetrics != null) {
+            return;
+        }
+
+        createTraversalMetricsSideEffectIfNecessary();
+
+        // How can traversalMetrics still be null? When running on computer it may need to be re-initialized from
+        // sideEffects after serialization.
+        if (traversalMetrics == null) {
+            // look up the TraversalMetrics in the root traversal's sideEffects
+            Traversal t = this.getTraversal();
+            while (!(t.asAdmin().getParent() instanceof EmptyStep)) {
+                t = t.asAdmin().getParent().asStep().getTraversal();
+            }
+            traversalMetrics = t.asAdmin().getSideEffects().get(TraversalMetrics.METRICS_KEY);
+        }
+    }
+
+    private void createTraversalMetricsSideEffectIfNecessary() {
+        if (this.getTraversal().getSideEffects().exists(TraversalMetrics.METRICS_KEY)) {
+            // Already initialized
+            return;
+        }
+
+        if (!(this.getTraversal().getParent() instanceof EmptyStep)) {
+            // Initialization is handled at the top-level of the traversal only.
+            return;
+        }
+
+        // The following code is executed once per top-level (non-nested) Traversal for all Profile steps. (Technically,
+        // once per thread if using Computer.)
+
+        traversalMetrics = this.getTraversal().getSideEffects().getOrCreate(TraversalMetrics.METRICS_KEY, StandardTraversalMetrics::new);
+        prepTraversalForProfiling(this.getTraversal().asAdmin(), null);
+    }
+
+    // Walk the traversal steps and initialize the Metrics timers.
+    private void prepTraversalForProfiling(Traversal.Admin<?, ?> traversal, MutableMetrics parentMetrics) {
+
+        DependantMutableMetrics prevMetrics = null;
+        final List<Step> steps = traversal.getSteps();
+        for (int ii = 0; ii + 1 < steps.size(); ii = ii + 2) {
+            Step step = steps.get(ii);
+            ProfileStep profileStep = (ProfileStep) steps.get(ii + 1);
+
+            // Create metrics
+            MutableMetrics metrics;
+
+            // Computer metrics are "stand-alone" but Standard metrics handle double-counted upstream time.
+            if (traversal.getEngine().isComputer()) {
+                metrics = new MutableMetrics(step.getId(), step.toString());
+            } else {
+                metrics = new DependantMutableMetrics(step.getId(), step.toString(), prevMetrics);
+                prevMetrics = (DependantMutableMetrics) metrics;
+            }
+
+            // Initialize counters (necessary because some steps might end up being 0)
+            metrics.incrementCount(TraversalMetrics.ELEMENT_COUNT_ID, 0);
+            metrics.incrementCount(TraversalMetrics.TRAVERSER_COUNT_ID, 0);
+
+            // Add metrics to parent, if necessary
+            if (parentMetrics != null) {
+                parentMetrics.addNested(metrics);
+            }
+
+            // The TraversalMetrics sideEffect is shared across all the steps.
+            profileStep.traversalMetrics = this.traversalMetrics;
+
+            // Add root metrics to traversalMetrics
+            this.traversalMetrics.addMetrics(metrics, step.getId(), ii / 2, parentMetrics == null, profileStep.getId());
+
+            // Handle nested traversal
+            if (step instanceof TraversalParent) {
+                for (Traversal.Admin<?, ?> t : ((TraversalParent) step).getLocalChildren()) {
+                    prepTraversalForProfiling(t, metrics);
+                }
+                for (Traversal.Admin<?, ?> t : ((TraversalParent) step).getGlobalChildren()) {
+                    prepTraversalForProfiling(t, metrics);
+                }
+            }
+        }
+    }
+
+    //////////////////
+
+    public static final class ProfileMapReduce extends StaticMapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> {
+
+        private static ProfileMapReduce INSTANCE = new ProfileMapReduce();
+
+        private ProfileMapReduce() {
+
+        }
+
+        @Override
+        public boolean doStage(final Stage stage) {
+            return true;
+        }
+
+        @Override
+        public String getMemoryKey() {
+            return TraversalMetrics.METRICS_KEY;
+        }
+
+        @Override
+        public void map(final Vertex vertex, final MapEmitter<NullObject, StandardTraversalMetrics> emitter) {
+            VertexTraversalSideEffects.of(vertex).<StandardTraversalMetrics>ifPresent(TraversalMetrics.METRICS_KEY, emitter::emit);
+        }
+
+        @Override
+        public void combine(final NullObject key, final Iterator<StandardTraversalMetrics> values, final ReduceEmitter<NullObject, StandardTraversalMetrics> emitter) {
+            reduce(key, values, emitter);
+        }
+
+        @Override
+        public void reduce(final NullObject key, final Iterator<StandardTraversalMetrics> values, final ReduceEmitter<NullObject, StandardTraversalMetrics> emitter) {
+            emitter.emit(StandardTraversalMetrics.merge(values));
+        }
+
+        @Override
+        public StandardTraversalMetrics generateFinalResult(final Iterator<KeyValue<NullObject, StandardTraversalMetrics>> keyValues) {
+            return keyValues.next().getValue();
+        }
+
+        public static ProfileMapReduce instance() {
+            return INSTANCE;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackElementValueStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackElementValueStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackElementValueStep.java
new file mode 100644
index 0000000..412164a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackElementValueStep.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.structure.Element;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SackElementValueStep<S extends Element, V> extends SideEffectStep<S> {
+
+    private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+            TraverserRequirement.SACK,
+            TraverserRequirement.OBJECT
+    );
+
+    private BinaryOperator<V> operator;
+    private final String propertyKey;
+
+    public SackElementValueStep(final Traversal.Admin traversal, final BinaryOperator<V> operator, final String propertyKey) {
+        super(traversal);
+        this.operator = operator;
+        this.propertyKey = propertyKey;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        traverser.get().values(this.propertyKey).forEachRemaining(value -> {
+            traverser.sack(this.operator.apply(traverser.sack(), (V) value));
+        });
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.operator, this.propertyKey);
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackObjectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackObjectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackObjectStep.java
new file mode 100644
index 0000000..cae570a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackObjectStep.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SackObjectStep<S, V> extends SideEffectStep<S> {
+
+    private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+            TraverserRequirement.SACK,
+            TraverserRequirement.OBJECT
+    );
+
+    private final BiFunction<V, S, V> operator;
+
+    public SackObjectStep(final Traversal.Admin traversal, final BiFunction<V, S, V> operator) {
+        super(traversal);
+        this.operator = operator;
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        traverser.sack(this.operator.apply(traverser.sack(), traverser.get()));
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectCapStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectCapStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectCapStep.java
new file mode 100644
index 0000000..28f1cf0
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectCapStep.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.SupplyingBarrierStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectCapStep<S, E> extends SupplyingBarrierStep<S, E> {
+
+    private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+            TraverserRequirement.SIDE_EFFECTS,
+            TraverserRequirement.OBJECT
+    );
+
+    private List<String> sideEffectKeys;
+
+    public SideEffectCapStep(final Traversal.Admin traversal, final String... sideEffectKeys) {
+        super(traversal);
+        if (0 == sideEffectKeys.length)
+            throw new IllegalArgumentException("At least one sideEffect key must be provided to " + this.getClass().getSimpleName());
+        this.sideEffectKeys = Arrays.asList(sideEffectKeys);
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKeys);
+    }
+
+    public List<String> getSideEffectKeys() {
+        return this.sideEffectKeys;
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
+    }
+
+    @Override
+    public E supply() {
+        return this.sideEffectKeys.size() == 1 ?
+                this.getTraversal().asAdmin().getSideEffects().get(this.sideEffectKeys.get(0)) :
+                (E) this.getMapOfSideEffects();
+    }
+
+    public Map<String, Object> getMapOfSideEffects() {
+        final Map<String, Object> sideEffects = new HashMap<>();
+        for (final String sideEffectKey : this.sideEffectKeys) {
+            sideEffects.put(sideEffectKey, this.getTraversal().asAdmin().getSideEffects().get(sideEffectKey));
+        }
+        return sideEffects;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectStep.java
new file mode 100644
index 0000000..2854e6e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectStep.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class SideEffectStep<S> extends AbstractStep<S, S> {
+
+    public SideEffectStep(final Traversal.Admin traversal) {
+        super(traversal);
+    }
+
+    protected abstract void sideEffect(final Traverser.Admin<S> traverser);
+
+    @Override
+    protected Traverser<S> processNextStart() {
+        final Traverser.Admin<S> traverser = this.starts.next();
+        this.sideEffect(traverser);
+        return traverser;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StartStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StartStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StartStep.java
new file mode 100644
index 0000000..5e762fe
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StartStep.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class StartStep<S> extends AbstractStep<S, S> {
+
+    protected Object start;
+    protected boolean first = true;
+
+    public StartStep(final Traversal.Admin traversal, final Object start) {
+        super(traversal);
+        this.start = start;
+    }
+
+    public StartStep(final Traversal.Admin traversal) {
+        this(traversal, null);
+    }
+
+    public <T> T getStart() {
+        return (T) this.start;
+    }
+
+    public boolean startAssignableTo(final Class... assignableClasses) {
+        return Stream.of(assignableClasses).filter(check -> check.isAssignableFrom(this.start.getClass())).findAny().isPresent();
+    }
+
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.start);
+    }
+
+    @Override
+    protected Traverser<S> processNextStart() {
+        if (this.first) {
+            if (null != this.start) {
+                if (this.start instanceof Iterator)
+                    this.starts.add(this.getTraversal().getTraverserGenerator().generateIterator((Iterator<S>) this.start, this, 1l));
+                else
+                    this.starts.add(this.getTraversal().getTraverserGenerator().generate((S) this.start, this, 1l));
+            }
+            this.first = false;
+        }
+        return this.starts.next();
+    }
+
+    @Override
+    public StartStep<S> clone() {
+        final StartStep<S> clone = (StartStep<S>) super.clone();
+        clone.first = true;
+        clone.start = null;
+        return clone;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java
new file mode 100644
index 0000000..20c92d6
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.function.BulkSetSupplier;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectCapable, TraversalParent, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+    private Traversal.Admin<S, Object> storeTraversal = null;
+    private String sideEffectKey;
+
+    public StoreStep(final Traversal.Admin traversal, final String sideEffectKey) {
+        super(traversal);
+        this.sideEffectKey = sideEffectKey;
+        this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, BulkSetSupplier.instance());
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<S> traverser) {
+        TraversalHelper.addToCollection(
+                traverser.sideEffects(this.sideEffectKey),
+                TraversalUtil.applyNullable(traverser.asAdmin(), this.storeTraversal),
+                traverser.bulk());
+    }
+
+    @Override
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKey, this.storeTraversal);
+    }
+
+    @Override
+    public MapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> getMapReduce() {
+        return new StoreMapReduce(this);
+    }
+
+    @Override
+    public List<Traversal.Admin<S, Object>> getLocalChildren() {
+        return null == this.storeTraversal ? Collections.emptyList() : Collections.singletonList(this.storeTraversal);
+    }
+
+    @Override
+    public void addLocalChild(final Traversal.Admin<?, ?> storeTraversal) {
+        this.storeTraversal = this.integrateChild(storeTraversal);
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements(TraverserRequirement.SIDE_EFFECTS, TraverserRequirement.BULK);
+    }
+
+    @Override
+    public StoreStep<S> clone() {
+        final StoreStep<S> clone = (StoreStep<S>) super.clone();
+        if (null != this.storeTraversal)
+            clone.storeTraversal = clone.integrateChild(this.storeTraversal.clone());
+        return clone;
+    }
+
+    ///////////////
+
+    public static final class StoreMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+        public static final String STORE_STEP_SIDE_EFFECT_KEY = "gremlin.storeStep.sideEffectKey";
+
+        private String sideEffectKey;
+        private Supplier<Collection> collectionSupplier;
+
+        private StoreMapReduce() {
+
+        }
+
+        public StoreMapReduce(final StoreStep step) {
+            this.sideEffectKey = step.getSideEffectKey();
+            this.collectionSupplier = step.getTraversal().asAdmin().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+        }
+
+        @Override
+        public void storeState(final Configuration configuration) {
+            super.storeState(configuration);
+            configuration.setProperty(STORE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+        }
+
+        @Override
+        public void loadState(final Configuration configuration) {
+            this.sideEffectKey = configuration.getString(STORE_STEP_SIDE_EFFECT_KEY);
+            this.collectionSupplier = TraversalVertexProgram.getTraversalSupplier(configuration).get().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+        }
+
+        @Override
+        public boolean doStage(final Stage stage) {
+            return stage.equals(Stage.MAP);
+        }
+
+        @Override
+        public void map(final Vertex vertex, final MapEmitter<NullObject, Object> emitter) {
+            VertexTraversalSideEffects.of(vertex).<Collection<?>>orElse(this.sideEffectKey, Collections.emptyList()).forEach(emitter::emit);
+        }
+
+        @Override
+        public Collection generateFinalResult(final Iterator<KeyValue<NullObject, Object>> keyValues) {
+            final Collection collection = this.collectionSupplier.get();
+            keyValues.forEachRemaining(pair -> collection.add(pair.getValue()));
+            return collection;
+        }
+
+        @Override
+        public String getMemoryKey() {
+            return this.sideEffectKey;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SubgraphStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SubgraphStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SubgraphStep.java
new file mode 100644
index 0000000..0657585
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SubgraphStep.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.T;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A side-effect step that produces an edge induced subgraph.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SubgraphStep extends SideEffectStep<Edge> implements SideEffectCapable {
+
+    private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+            TraverserRequirement.OBJECT,
+            TraverserRequirement.SIDE_EFFECTS
+    );
+
+    private Graph subgraph;
+    private String sideEffectKey;
+
+    private static final Map<String, Object> DEFAULT_CONFIGURATION = new HashMap<String, Object>() {{
+        put(Graph.GRAPH, "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph"); // hard coded because TinkerGraph is not part of gremlin-core
+    }};
+
+    // TODO: add support for side-effecting out an edge list.
+
+    public SubgraphStep(final Traversal.Admin traversal, final String sideEffectKey) {
+        super(traversal);
+        this.sideEffectKey = sideEffectKey;
+        this.getTraversal().asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, () -> GraphFactory.open(DEFAULT_CONFIGURATION));
+    }
+
+    @Override
+    protected void sideEffect(final Traverser.Admin<Edge> traverser) {
+        if (null == this.subgraph) {
+            this.subgraph = traverser.sideEffects(this.sideEffectKey);
+            if (!this.subgraph.features().vertex().supportsUserSuppliedIds() || !this.subgraph.features().edge().supportsUserSuppliedIds())
+                throw new IllegalArgumentException("The provided subgraph must support user supplied ids for vertices and edges: " + this.subgraph);
+        }
+        SubgraphStep.addEdgeToSubgraph(this.subgraph, traverser.get());
+    }
+
+    @Override
+    public String getSideEffectKey() {
+        return this.sideEffectKey;
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.sideEffectKey);
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
+    }
+
+    @Override
+    public SubgraphStep clone() {
+        final SubgraphStep clone = (SubgraphStep) super.clone();
+        this.subgraph = null;
+        return clone;
+    }
+
+    ///
+
+    private static Vertex getOrCreate(final Graph subgraph, final Vertex vertex) {
+        final Iterator<Vertex> vertexIterator = subgraph.vertices(vertex.id());
+        if (vertexIterator.hasNext()) return vertexIterator.next();
+        final Vertex subgraphVertex = subgraph.addVertex(T.id, vertex.id(), T.label, vertex.label());
+        vertex.properties().forEachRemaining(vertexProperty -> {
+            final VertexProperty<?> subgraphVertexProperty = subgraphVertex.property(vertexProperty.key(), vertexProperty.value(), T.id, vertexProperty.id()); // TODO: demand vertex property id?
+            vertexProperty.properties().forEachRemaining(property -> subgraphVertexProperty.<Object>property(property.key(), property.value()));
+        });
+        return subgraphVertex;
+    }
+
+    private static void addEdgeToSubgraph(final Graph subgraph, final Edge edge) {
+        final Iterator<Edge> edgeIterator = subgraph.edges(edge.id());
+        if (edgeIterator.hasNext()) return;
+        final Iterator<Vertex> vertexIterator = edge.vertices(Direction.BOTH);
+        final Vertex subGraphOutVertex = getOrCreate(subgraph, vertexIterator.next());
+        final Vertex subGraphInVertex = getOrCreate(subgraph, vertexIterator.next());
+        final Edge subGraphEdge = subGraphOutVertex.addEdge(edge.label(), subGraphInVertex, T.id, edge.id());
+        edge.properties().forEachRemaining(property -> subGraphEdge.<Object>property(property.key(), property.value()));
+    }
+}