You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:01:48 UTC

[22/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop directories to org/apache/tinkerpop

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
new file mode 100644
index 0000000..e0f6273
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.ranking.pagerank;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.process.computer.MessageCombiner;
+import com.tinkerpop.gremlin.process.computer.MessageScope;
+import com.tinkerpop.gremlin.process.computer.Messenger;
+import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
+import com.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
+import com.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
+import com.tinkerpop.gremlin.process.graph.traversal.__;
+import com.tinkerpop.gremlin.structure.Edge;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.VertexProperty;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import com.tinkerpop.gremlin.util.StreamFactory;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PageRankVertexProgram extends StaticVertexProgram<Double> {
+
+    private MessageScope.Local<Double> incidentMessageScope = MessageScope.Local.of(__::outE);
+    private MessageScope.Local<Double> countMessageScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.incidentMessageScope));
+
+    public static final String PAGE_RANK = "gremlin.pageRankVertexProgram.pageRank";
+    public static final String EDGE_COUNT = "gremlin.pageRankVertexProgram.edgeCount";
+
+    private static final String VERTEX_COUNT = "gremlin.pageRankVertexProgram.vertexCount";
+    private static final String ALPHA = "gremlin.pageRankVertexProgram.alpha";
+    private static final String TOTAL_ITERATIONS = "gremlin.pageRankVertexProgram.totalIterations";
+    private static final String INCIDENT_TRAVERSAL_SUPPLIER = "gremlin.pageRankVertexProgram.incidentTraversalSupplier";
+
+    private LambdaHolder<Supplier<Traversal<Vertex, Edge>>> traversalSupplier;
+    private double vertexCountAsDouble = 1.0d;
+    private double alpha = 0.85d;
+    private int totalIterations = 30;
+
+    private static final Set<String> COMPUTE_KEYS = new HashSet<>(Arrays.asList(PAGE_RANK, EDGE_COUNT));
+
+    private PageRankVertexProgram() {
+
+    }
+
+    @Override
+    public void loadState(final Configuration configuration) {
+        this.traversalSupplier = LambdaHolder.loadState(configuration, INCIDENT_TRAVERSAL_SUPPLIER);
+        if (null != this.traversalSupplier) {
+            VertexProgramHelper.verifyReversibility(this.traversalSupplier.get().get().asAdmin());
+            this.incidentMessageScope = MessageScope.Local.of(this.traversalSupplier.get());
+            this.countMessageScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.incidentMessageScope));
+        }
+        this.vertexCountAsDouble = configuration.getDouble(VERTEX_COUNT, 1.0d);
+        this.alpha = configuration.getDouble(ALPHA, 0.85d);
+        this.totalIterations = configuration.getInt(TOTAL_ITERATIONS, 30);
+    }
+
+    @Override
+    public void storeState(final Configuration configuration) {
+        configuration.setProperty(VERTEX_PROGRAM, PageRankVertexProgram.class.getName());
+        configuration.setProperty(VERTEX_COUNT, this.vertexCountAsDouble);
+        configuration.setProperty(ALPHA, this.alpha);
+        configuration.setProperty(TOTAL_ITERATIONS, this.totalIterations);
+        if (null != this.traversalSupplier) {
+            this.traversalSupplier.storeState(configuration);
+        }
+    }
+
+    @Override
+    public Set<String> getElementComputeKeys() {
+        return COMPUTE_KEYS;
+    }
+
+    @Override
+    public Optional<MessageCombiner<Double>> getMessageCombiner() {
+        return (Optional) PageRankMessageCombiner.instance();
+    }
+
+    @Override
+    public Set<MessageScope> getMessageScopes(final Memory memory) {
+        final Set<MessageScope> set = new HashSet<>();
+        set.add(memory.isInitialIteration() ? this.countMessageScope : this.incidentMessageScope);
+        return set;
+    }
+
+    @Override
+    public void setup(final Memory memory) {
+
+    }
+
+    @Override
+    public void execute(final Vertex vertex, Messenger<Double> messenger, final Memory memory) {
+        if (memory.isInitialIteration()) {
+            messenger.sendMessage(this.countMessageScope, 1.0d);
+        } else if (1 == memory.getIteration()) {
+            double initialPageRank = 1.0d / this.vertexCountAsDouble;
+            double edgeCount = StreamFactory.stream(messenger.receiveMessages(this.countMessageScope)).reduce(0.0d, (a, b) -> a + b);
+            vertex.property(VertexProperty.Cardinality.single,PAGE_RANK, initialPageRank);
+            vertex.property(VertexProperty.Cardinality.single,EDGE_COUNT, edgeCount);
+            messenger.sendMessage(this.incidentMessageScope, initialPageRank / edgeCount);
+        } else {
+            double newPageRank = StreamFactory.stream(messenger.receiveMessages(this.incidentMessageScope)).reduce(0.0d, (a, b) -> a + b);
+            newPageRank = (this.alpha * newPageRank) + ((1.0d - this.alpha) / this.vertexCountAsDouble);
+            vertex.property(VertexProperty.Cardinality.single,PAGE_RANK, newPageRank);
+            messenger.sendMessage(this.incidentMessageScope, newPageRank / vertex.<Double>value(EDGE_COUNT));
+        }
+    }
+
+    @Override
+    public boolean terminate(final Memory memory) {
+        return memory.getIteration() >= this.totalIterations;
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.vertexProgramString(this, "alpha=" + this.alpha + ",iterations=" + this.totalIterations);
+    }
+
+    //////////////////////////////
+
+    public static Builder build() {
+        return new Builder();
+    }
+
+    public static class Builder extends AbstractVertexProgramBuilder<Builder> {
+
+        private Builder() {
+            super(PageRankVertexProgram.class);
+        }
+
+        public Builder iterations(final int iterations) {
+            this.configuration.setProperty(TOTAL_ITERATIONS, iterations);
+            return this;
+        }
+
+        public Builder alpha(final double alpha) {
+            this.configuration.setProperty(ALPHA, alpha);
+            return this;
+        }
+
+        public Builder incident(final String scriptEngine, final String traversalScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, INCIDENT_TRAVERSAL_SUPPLIER, new String[]{scriptEngine, traversalScript});
+            return this;
+        }
+
+        public Builder incident(final String traversalScript) {
+            return incident(GREMLIN_GROOVY, traversalScript);
+        }
+
+        public Builder incident(final Supplier<Traversal<Vertex, Edge>> traversal) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, INCIDENT_TRAVERSAL_SUPPLIER, traversal);
+            return this;
+        }
+
+        public Builder incident(final Class<Supplier<Traversal<Vertex, Edge>>> traversalClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, INCIDENT_TRAVERSAL_SUPPLIER, traversalClass);
+            return this;
+        }
+
+        public Builder vertexCount(final long vertexCount) {
+            this.configuration.setProperty(VERTEX_COUNT, (double) vertexCount);
+            return this;
+        }
+    }
+
+    ////////////////////////////
+
+    @Override
+    public Features getFeatures() {
+        return new Features() {
+            @Override
+            public boolean requiresLocalMessageScopes() {
+                return true;
+            }
+
+            @Override
+            public boolean requiresVertexPropertyAddition() {
+                return true;
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalScript.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalScript.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalScript.java
new file mode 100644
index 0000000..af1437a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalScript.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.traversal;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.computer.ComputerResult;
+import com.tinkerpop.gremlin.process.computer.GraphComputer;
+import com.tinkerpop.gremlin.structure.Graph;
+
+import java.util.concurrent.Future;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface TraversalScript<S, E> {
+
+    public TraversalScript<S, E> over(final Graph graph);
+
+    public TraversalScript<S, E> using(final GraphComputer graphComputer);
+
+    public TraversalVertexProgram program();
+
+    public Future<Traversal<S, E>> traversal();
+
+    public Future<ComputerResult> result();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalSupplier.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalSupplier.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalSupplier.java
new file mode 100644
index 0000000..2e6f7ac
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalSupplier.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.traversal;
+
+import com.tinkerpop.gremlin.process.Traversal;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraversalSupplier<S, E> implements Supplier<Traversal.Admin<S, E>>, Serializable {
+
+    private final Traversal.Admin<S, E> traversal;
+
+    public TraversalSupplier(final Traversal.Admin<S, E> traversal) {
+        this.traversal = traversal;
+    }
+
+    @Override
+    public Traversal.Admin<S, E> get() {
+        return this.traversal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
new file mode 100644
index 0000000..fd72c84
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.traversal;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.TraversalEngine;
+import com.tinkerpop.gremlin.process.TraversalSideEffects;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.TraverserGenerator;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.process.computer.MessageCombiner;
+import com.tinkerpop.gremlin.process.computer.MessageScope;
+import com.tinkerpop.gremlin.process.computer.Messenger;
+import com.tinkerpop.gremlin.process.computer.VertexProgram;
+import com.tinkerpop.gremlin.process.computer.traversal.step.sideEffect.mapreduce.TraverserMapReduce;
+import com.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import com.tinkerpop.gremlin.process.computer.util.LambdaHolder;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.GraphStep;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.SideEffectCapStep;
+import com.tinkerpop.gremlin.process.traversal.TraversalMatrix;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.util.TraverserSet;
+import com.tinkerpop.gremlin.structure.Direction;
+import com.tinkerpop.gremlin.structure.Element;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.ElementHelper;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import com.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+
+
+/**
+ * TraversalVertexProgram enables the evaluation of a {@link Traversal} on a {@link com.tinkerpop.gremlin.process.computer.GraphComputer}.
+ * At the start of the computation, each {@link Vertex} (or {@link com.tinkerpop.gremlin.structure.Edge}) is assigned a single {@link Traverser}.
+ * For each traverser that is local to the vertex, the vertex looks up its current location in the traversal and processes that step.
+ * If the outputted traverser of the step references a local structure on the vertex (e.g. the vertex, an incident edge, its properties, or an arbitrary object),
+ * then the vertex continues to compute the next traverser. If the traverser references another location in the graph,
+ * then the traverser is sent to that location in the graph via a message. The messages of TraversalVertexProgram are traversers.
+ * This continues until all traversers in the computation have halted.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraversalVertexProgram implements VertexProgram<TraverserSet<?>> {
+
+    public static final String HALTED_TRAVERSERS = "gremlin.traversalVertexProgram.haltedTraversers";
+    private static final String VOTE_TO_HALT = "gremlin.traversalVertexProgram.voteToHalt";
+    public static final String TRAVERSAL_SUPPLIER = "gremlin.traversalVertexProgram.traversalSupplier";
+
+    // TODO: if not an adjacent traversal, use Local message scope -- a dual messaging system.
+    private static final Set<MessageScope> MESSAGE_SCOPES = new HashSet<>(Collections.singletonList(MessageScope.Global.instance()));
+    private static final Set<String> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(HALTED_TRAVERSERS, TraversalSideEffects.SIDE_EFFECTS));
+    private static final Set<String> MEMORY_COMPUTE_KEYS = new HashSet<>(Collections.singletonList(VOTE_TO_HALT));
+
+    private LambdaHolder<Supplier<Traversal.Admin<?, ?>>> traversalSupplier;
+    private Traversal.Admin<?, ?> traversal;
+    private TraversalMatrix<?, ?> traversalMatrix;
+
+    private final Set<MapReduce> mapReducers = new HashSet<>();
+
+    private TraversalVertexProgram() {
+    }
+
+    /**
+     * A helper method to yield a {@link Supplier} of {@link Traversal} from the {@link Configuration}.
+     * The supplier is either a {@link Class}, {@link com.tinkerpop.gremlin.process.computer.util.ScriptEngineLambda}, or a direct Java8 lambda.
+     *
+     * @param configuration The configuration containing the public static TRAVERSAL_SUPPLIER key.
+     * @return the traversal supplier in the configuration
+     */
+    public static Supplier<Traversal.Admin<?, ?>> getTraversalSupplier(final Configuration configuration) {
+        return LambdaHolder.<Supplier<Traversal.Admin<?, ?>>>loadState(configuration, TraversalVertexProgram.TRAVERSAL_SUPPLIER).get();
+    }
+
+    public Traversal.Admin<?, ?> getTraversal() {
+        return this.traversal;
+    }
+
+    @Override
+    public void loadState(final Configuration configuration) {
+        this.traversalSupplier = LambdaHolder.loadState(configuration, TRAVERSAL_SUPPLIER);
+        if (null == this.traversalSupplier) {
+            throw new IllegalArgumentException("The configuration does not have a traversal supplier");
+        }
+        this.traversal = this.traversalSupplier.get().get();
+        if (!this.traversal.getEngine().isPresent())
+            this.traversal.applyStrategies(TraversalEngine.COMPUTER);
+        this.traversalMatrix = new TraversalMatrix<>(this.traversal);
+        for (final MapReducer<?, ?, ?, ?, ?> mapReducer : TraversalHelper.getStepsOfAssignableClassRecurssively(MapReducer.class, this.traversal)) {
+            this.mapReducers.add(mapReducer.getMapReduce());
+        }
+        if (!(this.traversal.getEndStep() instanceof SideEffectCapStep))
+            this.mapReducers.add(new TraverserMapReduce(this.traversal.getEndStep()));
+    }
+
+    @Override
+    public void storeState(final Configuration configuration) {
+        VertexProgram.super.storeState(configuration);
+        this.traversalSupplier.storeState(configuration);
+    }
+
+    @Override
+    public void setup(final Memory memory) {
+        memory.set(VOTE_TO_HALT, true);
+    }
+
+    @Override
+    public Set<MessageScope> getMessageScopes(final Memory memory) {
+        return MESSAGE_SCOPES;
+    }
+
+    @Override
+    public void execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final Memory memory) {
+        this.traversal.getSideEffects().setLocalVertex(vertex);
+        if (memory.isInitialIteration()) {
+            final TraverserSet<Object> haltedTraversers = new TraverserSet<>();
+            vertex.property(HALTED_TRAVERSERS, haltedTraversers);
+
+            if (!(this.traversal.getStartStep() instanceof GraphStep))                              // TODO: make this generic to Traversal
+                throw new UnsupportedOperationException("TraversalVertexProgram currently only supports GraphStep starts on vertices or edges");
+
+            final GraphStep<Element> startStep = (GraphStep<Element>) this.traversal.getStartStep();
+            final TraverserGenerator traverserGenerator = this.traversal.getTraverserGenerator();
+            final String future = startStep.getNextStep().getId();
+            boolean voteToHalt = true;
+            final Iterator<? extends Element> starts = startStep.returnsVertices() ? IteratorUtils.of(vertex) : vertex.iterators().edgeIterator(Direction.OUT);
+            while (starts.hasNext()) {
+                final Element start = starts.next();
+                if (ElementHelper.idExists(start.id(), startStep.getIds())) {
+                    final Traverser.Admin<Element> traverser = traverserGenerator.generate(start, startStep, 1l);
+                    traverser.setStepId(future);
+                    traverser.detach();
+                    if (traverser.isHalted())
+                        haltedTraversers.add((Traverser.Admin) traverser);
+                    else {
+                        voteToHalt = false;
+                        messenger.sendMessage(MessageScope.Global.of(vertex), new TraverserSet<>(traverser));
+                    }
+                }
+            }
+            memory.and(VOTE_TO_HALT, voteToHalt);
+        } else {
+            memory.and(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix));
+        }
+    }
+
+    @Override
+    public boolean terminate(final Memory memory) {
+        final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
+        if (voteToHalt) {
+            return true;
+        } else {
+            memory.set(VOTE_TO_HALT, true);
+            return false;
+        }
+    }
+
+    @Override
+    public Set<String> getElementComputeKeys() {
+        return ELEMENT_COMPUTE_KEYS;
+    }
+
+    @Override
+    public Set<String> getMemoryComputeKeys() {
+        return MEMORY_COMPUTE_KEYS;
+    }
+
+    @Override
+    public Set<MapReduce> getMapReducers() {
+        return this.mapReducers;
+    }
+
+    @Override
+    public Optional<MessageCombiner<TraverserSet<?>>> getMessageCombiner() {
+        return (Optional) TraversalVertexProgramMessageCombiner.instance();
+    }
+
+    @Override
+    public TraversalVertexProgram clone() throws CloneNotSupportedException {
+        final TraversalVertexProgram clone = (TraversalVertexProgram) super.clone();
+        clone.traversal = this.traversal.clone();
+        clone.traversalMatrix = new TraversalMatrix<>(clone.traversal);
+        return clone;
+    }
+
+    @Override
+    public String toString() {
+        final String traversalString = this.traversal.toString().substring(1);
+        return StringFactory.vertexProgramString(this, traversalString.substring(0, traversalString.length() - 1));
+    }
+
+    @Override
+    public Features getFeatures() {
+        return new Features() {
+            @Override
+            public boolean requiresGlobalMessageScopes() {
+                return true;
+            }
+
+            @Override
+            public boolean requiresVertexPropertyAddition() {
+                return true;
+            }
+        };
+    }
+
+    //////////////
+
+    public static Builder build() {
+        return new Builder();
+    }
+
+    public static class Builder extends AbstractVertexProgramBuilder<Builder> {
+
+        public Builder() {
+            super(TraversalVertexProgram.class);
+        }
+
+        public Builder traversal(final String scriptEngine, final String traversalScript) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.SCRIPT, TRAVERSAL_SUPPLIER, new String[]{scriptEngine, traversalScript});
+            return this;
+        }
+
+        public Builder traversal(final String traversalScript) {
+            return traversal(GREMLIN_GROOVY, traversalScript);
+        }
+
+        public Builder traversal(final Traversal.Admin traversal) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.OBJECT, TRAVERSAL_SUPPLIER, new TraversalSupplier(traversal));
+            return this;
+        }
+
+        public Builder traversal(final Class<Supplier<Traversal.Admin>> traversalClass) {
+            LambdaHolder.storeState(this.configuration, LambdaHolder.Type.CLASS, TRAVERSAL_SUPPLIER, traversalClass);
+            return this;
+        }
+
+        // TODO Builder resolveElements(boolean) to be fed to ComputerResultStep
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgramMessageCombiner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgramMessageCombiner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgramMessageCombiner.java
new file mode 100644
index 0000000..0adc693
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgramMessageCombiner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.traversal;
+
+import com.tinkerpop.gremlin.process.computer.MessageCombiner;
+import com.tinkerpop.gremlin.process.util.TraverserSet;
+
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraversalVertexProgramMessageCombiner implements MessageCombiner<TraverserSet<?>> {
+
+    private static final Optional<TraversalVertexProgramMessageCombiner> INSTANCE = Optional.of(new TraversalVertexProgramMessageCombiner());
+
+    private TraversalVertexProgramMessageCombiner() {
+
+    }
+
+    public TraverserSet<?> combine(final TraverserSet<?> messageA, final TraverserSet<?> messageB) {
+        messageA.addAll((TraverserSet) messageB);
+        return messageA;
+    }
+
+    public static Optional<TraversalVertexProgramMessageCombiner> instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
new file mode 100644
index 0000000..62856fc
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.traversal;
+
+import com.tinkerpop.gremlin.process.Step;
+import com.tinkerpop.gremlin.process.TraversalSideEffects;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.MessageScope;
+import com.tinkerpop.gremlin.process.computer.Messenger;
+import com.tinkerpop.gremlin.process.traversal.TraversalMatrix;
+import com.tinkerpop.gremlin.process.util.TraverserSet;
+import com.tinkerpop.gremlin.structure.*;
+import com.tinkerpop.gremlin.structure.util.detached.DetachedElement;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraverserExecutor {
+
+    public static boolean execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final TraversalMatrix<?, ?> traversalMatrix) {
+
+        final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
+        final AtomicBoolean voteToHalt = new AtomicBoolean(true);
+
+        final TraverserSet<Object> aliveTraversers = new TraverserSet<>();
+        // gather incoming traversers into a traverser set and gain the 'weighted-set' optimization
+        final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
+        messenger.receiveMessages(MessageScope.Global.instance()).forEach(traverserSet -> {
+            traverserSet.forEach(traverser -> {
+                traverser.setSideEffects(traversalSideEffects);
+                traverser.attach(vertex);
+                aliveTraversers.add((Traverser.Admin) traverser);
+            });
+        });
+
+        // while there are still local traversers, process them until they leave the vertex or halt (i.e. isHalted()).
+        final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
+        while (!aliveTraversers.isEmpty()) {
+            // process all the local objects and send messages or store locally again
+            aliveTraversers.forEach(traverser -> {
+                if (traverser.get() instanceof Element || traverser.get() instanceof Property) {      // GRAPH OBJECT
+                    // if the element is remote, then message, else store it locally for re-processing
+                    final Vertex hostingVertex = TraverserExecutor.getHostingVertex(traverser.get());
+                    if (!vertex.equals(hostingVertex) || traverser.get() instanceof DetachedElement) { // TODO: why is the DetachedElement instanceof needed?
+                        voteToHalt.set(false);
+                        traverser.detach();
+                        messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser));
+                    } else
+                        toProcessTraversers.add(traverser);
+                } else                                                                              // STANDARD OBJECT
+                    toProcessTraversers.add(traverser);
+            });
+
+            // process local traversers and if alive, repeat, else halt.
+            aliveTraversers.clear();
+            toProcessTraversers.forEach(start -> {
+                final Step<?, ?> step = traversalMatrix.getStepById(start.getStepId());
+                step.addStart((Traverser.Admin) start);
+                step.forEachRemaining(end -> {
+                    if (end.asAdmin().isHalted()) {
+                        end.asAdmin().detach();
+                        haltedTraversers.add((Traverser.Admin) end);
+                    } else
+                        aliveTraversers.add((Traverser.Admin) end);
+                });
+            });
+
+            toProcessTraversers.clear();
+        }
+        return voteToHalt.get();
+    }
+
+    private static Vertex getHostingVertex(final Object object) {
+        Object obj = object;
+        while (true) {
+            if (obj instanceof Vertex)
+                return (Vertex) obj;
+            else if (obj instanceof Edge)
+                return ((Edge) obj).iterators().vertexIterator(Direction.OUT).next();
+            else if (obj instanceof Property)
+                obj = ((Property) obj).element();
+            else
+                throw new IllegalStateException("The host of the object is unknown: " + obj.toString() + ':' + obj.getClass().getCanonicalName());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/VertexTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/VertexTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/VertexTraversalSideEffects.java
new file mode 100644
index 0000000..bde04eb
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/VertexTraversalSideEffects.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.traversal;
+
+import com.tinkerpop.gremlin.process.TraversalSideEffects;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VertexTraversalSideEffects implements TraversalSideEffects {
+
+    private static final UnsupportedOperationException EXCEPTION = new UnsupportedOperationException(VertexTraversalSideEffects.class.getSimpleName() + " is a read only facade to the underlying sideEffect at the local vertex");
+
+    private Map<String, Object> objectMap;
+
+    private VertexTraversalSideEffects() {
+
+    }
+
+    @Override
+    public void registerSupplier(final String key, final Supplier supplier) {
+        throw EXCEPTION;
+    }
+
+    @Override
+    public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
+        throw EXCEPTION;
+    }
+
+    @Override
+    public void registerSupplierIfAbsent(final String key, final Supplier supplier) {
+        throw EXCEPTION;
+    }
+
+    @Override
+    public <S> void setSack(final Supplier<S> initialValue, final Optional<UnaryOperator<S>> splitOperator) {
+        throw EXCEPTION;
+    }
+
+    @Override
+    public <S> Optional<Supplier<S>> getSackInitialValue() {
+        throw EXCEPTION;
+    }
+
+    @Override
+    public <S> Optional<UnaryOperator<S>> getSackSplitOperator() {
+        throw EXCEPTION;
+    }
+
+    @Override
+    public boolean exists(final String key) {
+        return this.objectMap.containsKey(key);
+    }
+
+    @Override
+    public void set(final String key, final Object value) {
+        throw EXCEPTION;
+    }
+
+    @Override
+    public <V> V get(final String key) throws IllegalArgumentException {
+        final V value = (V) this.objectMap.get(key);
+        if (null != value)
+            return value;
+        else
+            throw TraversalSideEffects.Exceptions.sideEffectDoesNotExist(key);
+    }
+
+    @Override
+    public <V> V getOrCreate(final String key, final Supplier<V> orCreate) {
+        throw EXCEPTION;
+    }
+
+    @Override
+    public void remove(final String key) {
+        throw EXCEPTION;
+    }
+
+    @Override
+    public Set<String> keys() {
+        return Collections.unmodifiableSet(this.objectMap.keySet());
+    }
+
+    @Override
+    public void setLocalVertex(final Vertex vertex) {
+        this.objectMap = vertex.<Map<String, Object>>property(SIDE_EFFECTS).orElse(Collections.emptyMap());
+    }
+
+    @Override
+    public void mergeInto(final TraversalSideEffects sideEffects) {
+        throw EXCEPTION;
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.traversalSideEffectsString(this);
+    }
+
+    @Override
+    public VertexTraversalSideEffects clone() throws CloneNotSupportedException {
+        return (VertexTraversalSideEffects) super.clone();
+    }
+
+    /////
+
+    public static TraversalSideEffects of(final Vertex vertex) {
+        final TraversalSideEffects sideEffects = new VertexTraversalSideEffects();
+        sideEffects.setLocalVertex(vertex);
+        return sideEffects;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
new file mode 100644
index 0000000..4085429
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.traversal.step.map;
+
+import com.tinkerpop.gremlin.process.Step;
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.ComputerResult;
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import com.tinkerpop.gremlin.process.computer.traversal.step.sideEffect.mapreduce.TraverserMapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.SideEffectCapStep;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.structure.Graph;
+import com.tinkerpop.gremlin.structure.util.detached.Attachable;
+import com.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ComputerResultStep<S> extends AbstractStep<S, S> {
+
+    private final Iterator<Traverser.Admin<S>> traversers;
+    private final Graph graph;
+    private final Memory memory;
+    private final Traversal.Admin<?, ?> computerTraversal;
+    private final boolean attachElements; // should be part of graph computer with "propagate properties"
+
+    public ComputerResultStep(final Traversal.Admin traversal, final ComputerResult result, final TraversalVertexProgram traversalVertexProgram, final boolean attachElements) {
+        super(traversal);
+        this.graph = result.graph();
+        this.memory = result.memory();
+        this.attachElements = attachElements;
+        this.memory.keys().forEach(key -> traversal.getSideEffects().set(key, this.memory.get(key)));
+        this.computerTraversal = traversalVertexProgram.getTraversal();
+
+        final Step endStep = this.computerTraversal.getEndStep();
+        if (endStep instanceof SideEffectCapStep) {
+            final List<String> sideEffectKeys = ((SideEffectCapStep<?, ?>) endStep).getSideEffectKeys();
+            if (sideEffectKeys.size() == 1)
+                this.traversers = IteratorUtils.of(this.computerTraversal.getTraverserGenerator().generate(this.memory.get(sideEffectKeys.get(0)), this, 1l));
+            else {
+                final Map<String, Object> sideEffects = new HashMap<>();
+                for (final String sideEffectKey : sideEffectKeys) {
+                    sideEffects.put(sideEffectKey, this.memory.get(sideEffectKey));
+                }
+                this.traversers = IteratorUtils.of(this.computerTraversal.getTraverserGenerator().generate((S) sideEffects, this, 1l));
+            }
+        } else {
+            this.traversers = this.memory.get(TraverserMapReduce.TRAVERSERS);
+        }
+    }
+
+    @Override
+    public Traverser<S> processNextStart() {
+        final Traverser.Admin<S> traverser = this.traversers.next();
+        if (this.attachElements && (traverser.get() instanceof Attachable))
+            traverser.set((S) ((Attachable) traverser.get()).attach(this.graph));
+        return traverser;
+    }
+
+    @Override
+    public String toString() {
+        return TraversalHelper.makeStepString(this, this.computerTraversal);
+    }
+
+    @Override
+    public Set<TraverserRequirement> getRequirements() {
+        return this.computerTraversal.getTraverserRequirements();
+    }
+
+    public Traversal.Admin<?, ?> getComputerTraversal() {
+        return this.computerTraversal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
new file mode 100644
index 0000000..2127df5
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.traversal.step.sideEffect.mapreduce;
+
+import com.tinkerpop.gremlin.process.Step;
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.KeyValue;
+import com.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import com.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
+import com.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.ComparatorHolder;
+import com.tinkerpop.gremlin.process.traversal.step.Reducing;
+import com.tinkerpop.gremlin.process.util.TraverserSet;
+import com.tinkerpop.gremlin.structure.Graph;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import com.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraverserMapReduce extends StaticMapReduce<Comparable, Object, Comparable, Object, Iterator<Object>> {
+
+    public static final String TRAVERSERS = Graph.Hidden.hide("traversers");
+
+    private Traversal.Admin<?, ?> traversal;
+    private Optional<Comparator<Comparable>> comparator = Optional.empty();
+    private Optional<Reducing.Reducer> reducer = Optional.empty();
+
+    private TraverserMapReduce() {
+    }
+
+    public TraverserMapReduce(final Step traversalEndStep) {
+        this.traversal = traversalEndStep.getTraversal();
+        this.comparator = Optional.ofNullable(traversalEndStep instanceof ComparatorHolder ? GraphComputerHelper.chainComparators(((ComparatorHolder) traversalEndStep).getComparators()) : null);
+        this.reducer = Optional.ofNullable(traversalEndStep instanceof Reducing ? ((Reducing) traversalEndStep).getReducer() : null);
+    }
+
+    @Override
+    public void loadState(final Configuration configuration) {
+        this.traversal = TraversalVertexProgram.getTraversalSupplier(configuration).get();
+        final Step endStep = this.traversal.getEndStep();
+        this.comparator = Optional.ofNullable(endStep instanceof ComparatorHolder ? GraphComputerHelper.chainComparators(((ComparatorHolder) endStep).getComparators()) : null);
+        this.reducer = Optional.ofNullable(endStep instanceof Reducing ? ((Reducing) endStep).getReducer() : null);
+    }
+
+    @Override
+    public boolean doStage(final Stage stage) {
+        return stage.equals(Stage.MAP) || (stage.equals(Stage.REDUCE) && this.reducer.isPresent());
+    }
+
+    @Override
+    public void map(final Vertex vertex, final MapEmitter<Comparable, Object> emitter) {
+        if (this.comparator.isPresent())
+            vertex.<TraverserSet<?>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(traverser -> emitter.emit(traverser, traverser)));
+        else
+            vertex.<TraverserSet<?>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(emitter::emit));
+    }
+
+    @Override
+    public void reduce(final Comparable key, final Iterator<Object> values, final ReduceEmitter<Comparable, Object> emitter) {
+        Object mutatingSeed = this.reducer.get().getSeedSupplier().get();
+        final BiFunction function = this.reducer.get().getBiFunction();
+        final boolean onTraverser = this.reducer.get().onTraverser();
+        while (values.hasNext()) {
+            mutatingSeed = function.apply(mutatingSeed, onTraverser ? values.next() : ((Traverser) values.next()).get());
+        }
+        emitter.emit(key, this.traversal.getTraverserGenerator().generate(Reducing.FinalGet.tryFinalGet(mutatingSeed), (Step) this.traversal.getEndStep(), 1l));
+    }
+
+    @Override
+    public Optional<Comparator<Comparable>> getMapKeySort() {
+        return this.comparator;
+    }
+
+    @Override
+    public Iterator<Object> generateFinalResult(final Iterator<KeyValue<Comparable, Object>> keyValues) {
+        if (this.reducer.isPresent() && !keyValues.hasNext())
+            return IteratorUtils.of(this.traversal.getTraverserGenerator().generate(this.reducer.get().getSeedSupplier().get(), (Step) this.traversal.getEndStep(), 1l));
+        else {
+            return new Iterator<Object>() {
+                @Override
+                public boolean hasNext() {
+                    return keyValues.hasNext();
+                }
+
+                @Override
+                public Object next() {
+                    return keyValues.next().getValue();
+                }
+            };
+        }
+    }
+
+    @Override
+    public String getMemoryKey() {
+        return TRAVERSERS;
+    }
+
+    @Override
+    public int hashCode() {
+        return (this.getClass().getCanonicalName() + TRAVERSERS).hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object object) {
+        return GraphComputerHelper.areEqual(this, object);
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.mapReduceString(this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/AbstractVertexProgramBuilder.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/AbstractVertexProgramBuilder.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/AbstractVertexProgramBuilder.java
new file mode 100644
index 0000000..b0d1746
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/AbstractVertexProgramBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.util;
+
+import com.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.commons.configuration.BaseConfiguration;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class AbstractVertexProgramBuilder<B extends VertexProgram.Builder> implements VertexProgram.Builder {
+
+    public static final String GREMLIN_GROOVY = "gremlin-groovy";
+
+    protected final BaseConfiguration configuration = new BaseConfiguration();
+
+    public AbstractVertexProgramBuilder() {
+    }
+
+    public AbstractVertexProgramBuilder(final Class<? extends VertexProgram> vertexProgramClass) {
+        this.configuration.setProperty(VertexProgram.VERTEX_PROGRAM, vertexProgramClass.getName());
+    }
+
+    /*@Override
+    public B graph(final Graph graph) {
+        this.configuration.setProperty(Graph.GRAPH, graph.getClass().getName());
+        return (B) this;
+    }*/
+
+    @Override
+    public B configure(final Object... keyValues) {
+        VertexProgramHelper.legalConfigurationKeyValueArray(keyValues);
+        for (int i = 0; i < keyValues.length; i = i + 2) {
+            this.configuration.setProperty((String) keyValues[i], keyValues[i + 1]);
+        }
+        return (B) this;
+    }
+
+    @Override
+    public <P extends VertexProgram> P create() {
+        return (P) VertexProgram.createVertexProgram(this.configuration);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ComputerDataStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ComputerDataStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ComputerDataStrategy.java
new file mode 100644
index 0000000..8bc4239
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ComputerDataStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.util;
+
+import com.tinkerpop.gremlin.process.computer.VertexProgram;
+import com.tinkerpop.gremlin.structure.Graph;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.structure.VertexProperty;
+import com.tinkerpop.gremlin.structure.strategy.GraphStrategy;
+import com.tinkerpop.gremlin.structure.strategy.StrategyContext;
+import com.tinkerpop.gremlin.structure.strategy.StrategyGraph;
+import com.tinkerpop.gremlin.structure.strategy.StrategyVertex;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+import com.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ComputerDataStrategy implements GraphStrategy {
+
+    private final Set<String> elementComputeKeys;
+
+    public ComputerDataStrategy(final Set<String> elementComputeKeys) {
+        this.elementComputeKeys = elementComputeKeys;
+    }
+
+    @Override
+    public <V> UnaryOperator<Function<String[], Iterator<VertexProperty<V>>>> getVertexIteratorsPropertyIteratorStrategy(final StrategyContext<StrategyVertex> ctx, final GraphStrategy strategyComposer) {
+        return (f) -> (keys) -> keys.length == 0 ? IteratorUtils.filter(f.apply(keys), property -> !this.elementComputeKeys.contains(property.key())) : f.apply(keys);
+    }
+
+    @Override
+    public <V> UnaryOperator<Function<String[], Iterator<V>>> getVertexIteratorsValueIteratorStrategy(final StrategyContext<StrategyVertex> ctx, final GraphStrategy strategyComposer) {
+        return (f) -> (keys) -> IteratorUtils.map(ctx.getCurrent().iterators().<V>propertyIterator(keys), vertexProperty -> vertexProperty.value());
+    }
+
+    @Override
+    public UnaryOperator<Supplier<Set<String>>> getVertexKeysStrategy(final StrategyContext<StrategyVertex> ctx, final GraphStrategy strategyComposer) {
+        return (f) -> () -> IteratorUtils.fill(IteratorUtils.filter(f.get().iterator(), key -> !this.elementComputeKeys.contains(key)), new HashSet<>());
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.graphStrategyString(this);
+    }
+
+    public static StrategyGraph wrapGraph(final Graph graph, final VertexProgram<?> vertexProgram) {
+        return graph.strategy(new ComputerDataStrategy(vertexProgram.getElementComputeKeys()));
+    }
+
+    public static StrategyVertex wrapVertex(final Vertex vertex, final VertexProgram<?> vertexProgram) {
+        return new StrategyVertex(vertex, vertex.graph().strategy(new ComputerDataStrategy(vertexProgram.getElementComputeKeys())));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/DefaultComputerResult.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/DefaultComputerResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/DefaultComputerResult.java
new file mode 100644
index 0000000..705d23f
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/DefaultComputerResult.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.util;
+
+import com.tinkerpop.gremlin.process.computer.ComputerResult;
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.structure.Graph;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+
+/**
+ * The result of the {@link com.tinkerpop.gremlin.process.computer.GraphComputer}'s computation. This is returned in a {@link java.util.concurrent.Future} by GraphComputer.submit().
+ * A GraphComputer computation yields two things: an updated view of the computed on {@link Graph} and any computational sideEffects called {@link com.tinkerpop.gremlin.process.computer.Memory}.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class DefaultComputerResult implements ComputerResult {
+
+    protected final Graph graph;
+    protected final Memory memory;
+
+    public DefaultComputerResult(final Graph graph, final Memory memory) {
+        this.graph = graph;
+        this.memory = memory;
+    }
+
+    @Override
+    public Graph graph() {
+        return this.graph;
+    }
+
+    @Override
+    public Memory memory() {
+        return this.memory;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.computeResultString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
new file mode 100644
index 0000000..153c2c7
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/GraphComputerHelper.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.util;
+
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.GraphComputer;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.process.computer.VertexProgram;
+import com.tinkerpop.gremlin.structure.Graph;
+
+import java.lang.reflect.Method;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GraphComputerHelper {
+
+    private GraphComputerHelper() {
+    }
+
+    public static void validateProgramOnComputer(final GraphComputer computer, final VertexProgram vertexProgram) {
+        if (vertexProgram.getMemoryComputeKeys().contains(null))
+            throw Memory.Exceptions.memoryKeyCanNotBeNull();
+        if (vertexProgram.getMemoryComputeKeys().contains(""))
+            throw Memory.Exceptions.memoryKeyCanNotBeEmpty();
+
+        final GraphComputer.Features graphComputerFeatures = computer.features();
+        final VertexProgram.Features vertexProgramFeatures = vertexProgram.getFeatures();
+
+        for (final Method method : VertexProgram.Features.class.getMethods()) {
+            if (method.getName().startsWith("requires")) {
+                final boolean supports;
+                final boolean requires;
+                try {
+                    supports = (boolean) GraphComputer.Features.class.getMethod(method.getName().replace("requires", "supports")).invoke(graphComputerFeatures);
+                    requires = (boolean) method.invoke(vertexProgramFeatures);
+                } catch (final Exception e) {
+                    throw new IllegalStateException("A reflection exception has occurred: " + e.getMessage(), e);
+                }
+                if (requires && !supports)
+                    throw new IllegalStateException("The vertex program can not be executed on the graph computer: " + method.getName());
+            }
+        }
+    }
+
+    public static void validateComputeArguments(Class... graphComputerClass) {
+        if (graphComputerClass.length > 1)
+            throw Graph.Exceptions.onlyOneOrNoGraphComputerClass();
+    }
+
+    public static boolean areEqual(final MapReduce a, final Object b) {
+        if (null == a)
+            throw Graph.Exceptions.argumentCanNotBeNull("a");
+        if (null == b)
+            throw Graph.Exceptions.argumentCanNotBeNull("b");
+
+        if (!(b instanceof MapReduce)) return false;
+        return a.getClass().equals(b.getClass()) && a.getMemoryKey().equals(((MapReduce) b).getMemoryKey());
+    }
+
+    public static <T> Comparator<Traverser<T>> chainComparators(final List<Comparator<T>> comparators) {
+        if (comparators.size() == 0) {
+            return (a, b) -> a.compareTo(b);
+        } else {
+            return comparators.stream().map(c -> (Comparator<Traverser<T>>) new Comparator<Traverser<T>>() {
+                @Override
+                public int compare(final Traverser<T> o1, final Traverser<T> o2) {
+                    return c.compare(o1.get(), o2.get());
+                }
+            }).reduce((a, b) -> a.thenComparing(b)).get();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ImmutableMemory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ImmutableMemory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ImmutableMemory.java
new file mode 100644
index 0000000..2979487
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ImmutableMemory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.util;
+
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ImmutableMemory implements Memory.Admin {
+
+    private final Memory baseMemory;
+
+    public ImmutableMemory(final Memory baseMemory) {
+        this.baseMemory = baseMemory;
+    }
+
+    @Override
+    public Set<String> keys() {
+        return this.baseMemory.keys();
+    }
+
+    @Override
+    public <R> R get(final String key) throws IllegalArgumentException {
+        return this.baseMemory.get(key);
+    }
+
+    @Override
+    public void set(final String key, final Object value) {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public int getIteration() {
+        return this.baseMemory.getIteration();
+    }
+
+    @Override
+    public long getRuntime() {
+        return this.baseMemory.getRuntime();
+    }
+
+    @Override
+    public long incr(final String key, final long delta) {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public boolean and(final String key, final boolean bool) {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public boolean or(final String key, final boolean bool) {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public void incrIteration() {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public void setIteration(final int iteration) {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public void setRuntime(final long runtime) {
+        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.memoryString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/LambdaHolder.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/LambdaHolder.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/LambdaHolder.java
new file mode 100644
index 0000000..21eb6ce
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/LambdaHolder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.util;
+
+
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class LambdaHolder<S> {
+
+    public enum Type {
+        OBJECT,
+        CLASS,
+        SCRIPT
+    }
+
+    private Type type;
+    private Object configObject;
+    private Object realObject;
+    private String configKeyPrefix;
+
+    private static final String DOT_TYPE = ".type";
+    private static final String DOT_OBJECT = ".object";
+
+
+    public S get() {
+        return (S) this.realObject;
+    }
+
+    private LambdaHolder() {
+
+    }
+
+    public static <T> LambdaHolder<T> storeState(final Configuration configuration, final Type type, final String configKeyPrefix, final Object configObject) {
+        if (type.equals(Type.SCRIPT) && !(configObject instanceof String[]))
+            throw new IllegalArgumentException("Script lambda types must have a String[] configuration object");
+        if (type.equals(Type.CLASS) && !(configObject instanceof Class))
+            throw new IllegalArgumentException("Class lambda types must have a Class configuration object");
+
+        final LambdaHolder<T> lambdaHolder = new LambdaHolder<>();
+        lambdaHolder.type = type;
+        lambdaHolder.configKeyPrefix = configKeyPrefix;
+        lambdaHolder.configObject = configObject;
+        lambdaHolder.storeState(configuration);
+        return lambdaHolder;
+    }
+
+    public static <T> LambdaHolder<T> loadState(final Configuration configuration, final String configKeyPrefix) {
+        if (!configuration.containsKey(configKeyPrefix.concat(DOT_TYPE)))
+            return null;
+        if (!configuration.containsKey(configKeyPrefix.concat(DOT_OBJECT)))
+            return null;
+
+        final LambdaHolder<T> lambdaHolder = new LambdaHolder<>();
+        lambdaHolder.configKeyPrefix = configKeyPrefix;
+        lambdaHolder.type = Type.valueOf(configuration.getString(lambdaHolder.configKeyPrefix.concat(DOT_TYPE)));
+        if (lambdaHolder.type.equals(Type.OBJECT)) {
+            lambdaHolder.configObject = VertexProgramHelper.deserialize(configuration, lambdaHolder.configKeyPrefix.concat(DOT_OBJECT));
+            lambdaHolder.realObject = lambdaHolder.configObject;
+        } else if (lambdaHolder.type.equals(Type.CLASS)) {
+            try {
+                final Class klass = (Class) Class.forName(configuration.getString(lambdaHolder.configKeyPrefix.concat(DOT_OBJECT)));
+                lambdaHolder.configObject = klass;
+                lambdaHolder.realObject = klass.newInstance();
+            } catch (final Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        } else { // SCRIPT
+            final String[] script = VertexProgramHelper.deserialize(configuration, lambdaHolder.configKeyPrefix.concat(DOT_OBJECT));
+            lambdaHolder.configObject = script;
+            lambdaHolder.realObject = new ScriptEngineLambda(script[0], script[1]);
+        }
+        return lambdaHolder;
+    }
+
+
+    public void storeState(final Configuration configuration) {
+        configuration.setProperty(this.configKeyPrefix.concat(DOT_TYPE), this.type.name());
+        if (this.type.equals(Type.OBJECT))
+            VertexProgramHelper.serialize(this.configObject, configuration, this.configKeyPrefix.concat(DOT_OBJECT));
+        else if (this.type.equals(Type.CLASS))
+            configuration.setProperty(this.configKeyPrefix.concat(DOT_OBJECT), ((Class) this.configObject).getCanonicalName());
+        else
+            VertexProgramHelper.serialize(this.configObject, configuration, this.configKeyPrefix.concat(DOT_OBJECT));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
new file mode 100644
index 0000000..6dc56fe
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.util;
+
+import com.tinkerpop.gremlin.process.computer.GraphComputer;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.computer.Memory;
+import com.tinkerpop.gremlin.process.computer.VertexProgram;
+import com.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MapMemory implements Memory.Admin, Serializable {
+
+    private long runtime = 0l;
+    private int iteration = -1;
+    private final Map<String, Object> memoryMap = new HashMap<>();
+    private final Set<String> memoryComputeKeys = new HashSet<>();
+
+    public void addVertexProgramMemoryComputeKeys(final VertexProgram<?> vertexProgram) {
+        this.memoryComputeKeys.addAll(vertexProgram.getMemoryComputeKeys());
+    }
+
+    public void addMapReduceMemoryKey(final MapReduce mapReduce) {
+        this.memoryComputeKeys.add(mapReduce.getMemoryKey());
+    }
+
+    @Override
+    public Set<String> keys() {
+        return this.memoryMap.keySet();
+    }
+
+    @Override
+    public <R> R get(final String key) throws IllegalArgumentException {
+        final R r = (R) this.memoryMap.get(key);
+        if (null == r)
+            throw Memory.Exceptions.memoryDoesNotExist(key);
+        else
+            return r;
+    }
+
+    @Override
+    public void set(final String key, Object value) {
+        this.memoryMap.put(key, value);
+    }
+
+    @Override
+    public int getIteration() {
+        return this.iteration;
+    }
+
+    @Override
+    public long getRuntime() {
+        return this.runtime;
+    }
+
+    @Override
+    public long incr(final String key, final long delta) {
+        this.checkKeyValue(key, delta);
+        if (this.memoryMap.containsKey(key)) {
+            final long newValue = (long) this.memoryMap.get(key) + delta;
+            this.memoryMap.put(key, newValue);
+            return newValue;
+        } else {
+            this.memoryMap.put(key, delta);
+            return delta;
+        }
+    }
+
+    @Override
+    public boolean and(final String key, final boolean bool) {
+        this.checkKeyValue(key, bool);
+        if (this.memoryMap.containsKey(key)) {
+            final boolean newValue = (boolean) this.memoryMap.get(key) && bool;
+            this.memoryMap.put(key, newValue);
+            return newValue;
+        } else {
+            this.memoryMap.put(key, bool);
+            return bool;
+        }
+    }
+
+    @Override
+    public boolean or(final String key, final boolean bool) {
+        this.checkKeyValue(key, bool);
+        if (this.memoryMap.containsKey(key)) {
+            final boolean newValue = (boolean) this.memoryMap.get(key) || bool;
+            this.memoryMap.put(key, newValue);
+            return newValue;
+        } else {
+            this.memoryMap.put(key, bool);
+            return bool;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.memoryString(this);
+    }
+
+    @Override
+    public void incrIteration() {
+        this.iteration = this.iteration + 1;
+    }
+
+    @Override
+    public void setIteration(final int iteration) {
+        this.iteration = iteration;
+    }
+
+    @Override
+    public void setRuntime(long runtime) {
+        this.runtime = runtime;
+    }
+
+    private final void checkKeyValue(final String key, final Object value) {
+        if (!this.memoryComputeKeys.contains(key))
+            throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
+        MemoryHelper.validateValue(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MemoryHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MemoryHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MemoryHelper.java
new file mode 100644
index 0000000..dcda10e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MemoryHelper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.util;
+
+import com.tinkerpop.gremlin.process.computer.Memory;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MemoryHelper {
+
+    public MemoryHelper() {
+    }
+
+    public static void validateValue(final Object value) throws IllegalArgumentException {
+        if (null == value)
+            throw Memory.Exceptions.memoryValueCanNotBeNull();
+    }
+
+    public static void validateKey(final String key) throws IllegalArgumentException {
+        if (null == key)
+            throw Memory.Exceptions.memoryKeyCanNotBeNull();
+        if (key.isEmpty())
+            throw Memory.Exceptions.memoryKeyCanNotBeEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ScriptEngineCache.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ScriptEngineCache.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ScriptEngineCache.java
new file mode 100644
index 0000000..1d99870
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ScriptEngineCache.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.computer.util;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Daniel Kuppitz (daniel at thinkaurelius.com)
+ */
+public final class ScriptEngineCache {
+
+    public final static String DEFAULT_SCRIPT_ENGINE = "gremlin-groovy";
+
+    private final static ScriptEngineManager SCRIPT_ENGINE_MANAGER = new ScriptEngineManager();
+    private final static Map<String, ScriptEngine> CACHED_ENGINES = new ConcurrentHashMap<>();
+
+    public static ScriptEngine get(final String engineName) {
+        return CACHED_ENGINES.compute(engineName, (key, engine) -> {
+            if (null == engine) {
+                engine = SCRIPT_ENGINE_MANAGER.getEngineByName(engineName);
+                if (null == engine) {
+                    throw new IllegalArgumentException("There is no script engine with provided name: " + engineName);
+                }
+            }
+            return engine;
+        });
+    }
+}