You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/19 23:49:01 UTC

[1/2] incubator-tinkerpop git commit: Migrated over to the proposed Memory model of using registered BinaryOperator reducers. It was really easy to change so thats good. All test cases pass for TinkerGraphComputer, one fails in SparkGraphComputer, and I

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1166 [created] b50a43ce7


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
index dbfadcc..405c9f3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
@@ -21,10 +21,10 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
@@ -43,25 +43,26 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public final class SparkMemory implements Memory.Admin, Serializable {
 
-    public final Set<String> memoryKeys = new HashSet<>();
+    public final Map<String, MemoryComputeKey> memoryKeys = new HashMap<>();
     private final AtomicInteger iteration = new AtomicInteger(0);   // do these need to be atomics?
     private final AtomicLong runtime = new AtomicLong(0l);
-    private final Map<String, Accumulator<Rule>> memory = new HashMap<>();
+    private final Map<String, Accumulator> memory = new HashMap<>();
     private Broadcast<Map<String, Object>> broadcast;
     private boolean inTask = false;
 
     public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
         if (null != vertexProgram) {
-            for (final String key : vertexProgram.getMemoryComputeKeys()) {
-                MemoryHelper.validateKey(key);
-                this.memoryKeys.add(key);
+            for (final MemoryComputeKey key : vertexProgram.getMemoryComputeKeys()) {
+                MemoryHelper.validateKey(key.getKey());
+                this.memoryKeys.put(key.getKey(), key);
             }
         }
         for (final MapReduce mapReduce : mapReducers) {
-            this.memoryKeys.add(mapReduce.getMemoryKey());
+            this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false));
         }
-        for (final String key : this.memoryKeys) {
-            this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), key, new RuleAccumulator()));
+        for (final MemoryComputeKey key : this.memoryKeys.values()) {
+            this.memory.put(key.getKey(), sparkContext.accumulator(null, key.getKey(), new MemoryAccumulator<>(key)));
+
         }
         this.broadcast = sparkContext.broadcast(new HashMap<>());
     }
@@ -73,7 +74,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         else {
             final Set<String> trueKeys = new HashSet<>();
             this.memory.forEach((key, value) -> {
-                if (value.value().getObject() != null)
+                if (value.value() != null)
                     trueKeys.add(key);
             });
             return Collections.unmodifiableSet(trueKeys);
@@ -115,39 +116,21 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     }
 
     @Override
-    public void incr(final String key, final long delta) {
-        checkKeyValue(key, delta);
-        if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.INCR, delta));
-        else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.INCR, this.<Long>getValue(key) + delta));
-    }
-
-    @Override
-    public void and(final String key, final boolean bool) {
-        checkKeyValue(key, bool);
-        if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.AND, bool));
-        else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.AND, this.<Boolean>getValue(key) && bool));
-    }
-
-    @Override
-    public void or(final String key, final boolean bool) {
-        checkKeyValue(key, bool);
+    public void add(final String key, final Object value) {
+        checkKeyValue(key, value);
         if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.OR, bool));
+            this.memory.get(key).add(value);
         else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.OR, this.<Boolean>getValue(key) || bool));
+            this.memory.get(key).setValue(value);
     }
 
     @Override
     public void set(final String key, final Object value) {
         checkKeyValue(key, value);
         if (this.inTask)
-            this.memory.get(key).add(new Rule(Rule.Operation.SET, value));
+            this.memory.get(key).add(value);
         else
-            this.memory.get(key).setValue(new Rule(Rule.Operation.SET, value));
+            this.memory.get(key).setValue(value);
     }
 
     @Override
@@ -162,20 +145,20 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     protected void broadcastMemory(final JavaSparkContext sparkContext) {
         this.broadcast.destroy(true); // do we need to block?
         final Map<String, Object> toBroadcast = new HashMap<>();
-        this.memory.forEach((key, rule) -> {
-            if (null != rule.value().getObject())
-                toBroadcast.put(key, rule.value().getObject());
+        this.memory.forEach((key, object) -> {
+            if (null != object.value())
+                toBroadcast.put(key, object.value());
         });
         this.broadcast = sparkContext.broadcast(toBroadcast);
     }
 
     private void checkKeyValue(final String key, final Object value) {
-        if (!this.memoryKeys.contains(key))
+        if (!this.memoryKeys.containsKey(key))
             throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
         MemoryHelper.validateValue(value);
     }
 
     private <R> R getValue(final String key) {
-        return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().getObject();
+        return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index e721a76..49227af 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -138,7 +138,7 @@ public final class TinkerGraphComputer implements GraphComputer {
             final long time = System.currentTimeMillis();
             try (final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers)) {
                 if (null != this.vertexProgram) {
-                    TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getElementComputeKeys());
+                    TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
                     // execute the vertex program
                     this.vertexProgram.setup(this.memory);
                     this.memory.completeSubRound();
@@ -215,6 +215,7 @@ public final class TinkerGraphComputer implements GraphComputer {
                 this.memory.complete();
                 // determine the resultant graph based on the result graph/persist state
                 final TinkerGraphComputerView view = TinkerHelper.getGraphComputerView(this.graph);
+                view.complete(); // drop all transient properties
                 final Graph resultGraph = null == view ? this.graph : view.processResultGraphPersist(this.resultGraph, this.persist);
                 TinkerHelper.dropGraphComputerView(this.graph);
                 return new DefaultComputerResult(resultGraph, this.memory.asImmutable());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
index 8d2681d..4c36e63 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -51,15 +52,16 @@ import java.util.stream.Stream;
 public final class TinkerGraphComputerView {
 
     private final TinkerGraph graph;
-    protected final Set<String> computeKeys;
+    protected final Map<String, VertexComputeKey> computeKeys;
     private Map<Element, Map<String, List<VertexProperty<?>>>> computeProperties;
     private final Set<Object> legalVertices = new HashSet<>();
     private final Map<Object, Set<Object>> legalEdges = new HashMap<>();
     private final GraphFilter graphFilter;
 
-    public TinkerGraphComputerView(final TinkerGraph graph, final GraphFilter graphFilter, final Set<String> computeKeys) {
+    public TinkerGraphComputerView(final TinkerGraph graph, final GraphFilter graphFilter, final Set<VertexComputeKey> computeKeys) {
         this.graph = graph;
-        this.computeKeys = computeKeys;
+        this.computeKeys = new HashMap<>();
+        computeKeys.forEach(key -> this.computeKeys.put(key.getKey(), key));
         this.computeProperties = new ConcurrentHashMap<>();
         this.graphFilter = graphFilter;
         if (this.graphFilter.hasFilter()) {
@@ -125,6 +127,11 @@ public final class TinkerGraphComputerView {
         return !this.graphFilter.hasEdgeFilter() || this.legalEdges.get(vertex.id()).contains(edge.id());
     }
 
+    protected void complete() {
+    //    this.computeKeys.values().stream().filter(ComputeKey.Vertex::isTransient).forEach(key ->
+    //            this.computeProperties.values().stream().flatMap(map -> map.get(key.getKey()).stream()).forEach(VertexProperty::remove));
+    }
+
     //////////////////////
 
     public Graph processResultGraphPersist(final GraphComputer.ResultGraph resultGraph,
@@ -195,7 +202,7 @@ public final class TinkerGraphComputerView {
     //////////////////////
 
     private boolean isComputeKey(final String key) {
-        return this.computeKeys.contains(key);
+        return this.computeKeys.containsKey(key);
     }
 
     private void addValue(final Vertex vertex, final String key, final VertexProperty property) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
index dac5766..54e064b 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
@@ -21,11 +21,12 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,7 +38,7 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public final class TinkerMemory implements Memory.Admin {
 
-    public final Set<String> memoryKeys = new HashSet<>();
+    public final Map<String, MemoryComputeKey> memoryKeys = new HashMap<>();
     public Map<String, Object> previousMap;
     public Map<String, Object> currentMap;
     private final AtomicInteger iteration = new AtomicInteger(0);
@@ -47,13 +48,13 @@ public final class TinkerMemory implements Memory.Admin {
         this.currentMap = new ConcurrentHashMap<>();
         this.previousMap = new ConcurrentHashMap<>();
         if (null != vertexProgram) {
-            for (final String key : vertexProgram.getMemoryComputeKeys()) {
-                MemoryHelper.validateKey(key);
-                this.memoryKeys.add(key);
+            for (final MemoryComputeKey key : vertexProgram.getMemoryComputeKeys()) {
+                MemoryHelper.validateKey(key.getKey());
+                this.memoryKeys.put(key.getKey(), key);
             }
         }
         for (final MapReduce mapReduce : mapReducers) {
-            this.memoryKeys.add(mapReduce.getMemoryKey());
+            this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), (a, b) -> b, false));
         }
     }
 
@@ -90,11 +91,11 @@ public final class TinkerMemory implements Memory.Admin {
     protected void complete() {
         this.iteration.decrementAndGet();
         this.previousMap = this.currentMap;
+        this.memoryKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(key -> this.previousMap.remove(key.getKey()));
     }
 
     protected void completeSubRound() {
         this.previousMap = new ConcurrentHashMap<>(this.currentMap);
-
     }
 
     @Override
@@ -112,27 +113,15 @@ public final class TinkerMemory implements Memory.Admin {
     }
 
     @Override
-    public void incr(final String key, final long delta) {
-        checkKeyValue(key, delta);
-        this.currentMap.compute(key, (k, v) -> null == v ? delta : delta + (Long) v);
-    }
-
-    @Override
-    public void and(final String key, final boolean bool) {
-        checkKeyValue(key, bool);
-        this.currentMap.compute(key, (k, v) -> null == v ? bool : bool && (Boolean) v);
-    }
-
-    @Override
-    public void or(final String key, final boolean bool) {
-        checkKeyValue(key, bool);
-        this.currentMap.compute(key, (k, v) -> null == v ? bool : bool || (Boolean) v);
+    public void set(final String key, final Object value) {
+        checkKeyValue(key, value);
+        this.currentMap.put(key, value);
     }
 
     @Override
-    public void set(final String key, final Object value) {
+    public void add(final String key, final Object value) {
         checkKeyValue(key, value);
-        this.currentMap.put(key, value);
+        this.currentMap.compute(key, (k, v) -> null == v ? value : this.memoryKeys.get(key).getReducer().apply(v, value));
     }
 
     @Override
@@ -141,7 +130,7 @@ public final class TinkerMemory implements Memory.Admin {
     }
 
     private void checkKeyValue(final String key, final Object value) {
-        if (!this.memoryKeys.contains(key))
+        if (!this.memoryKeys.containsKey(key))
             throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
         MemoryHelper.validateValue(value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerHelper.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerHelper.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerHelper.java
index 705fa49..53c9bfc 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerHelper.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/structure/TinkerHelper.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.tinkergraph.structure;
 
 import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -101,7 +102,7 @@ public final class TinkerHelper {
         return null != graph.graphComputerView;
     }
 
-    public static TinkerGraphComputerView createGraphComputerView(final TinkerGraph graph, final GraphFilter graphFilter, final Set<String> computeKeys) {
+    public static TinkerGraphComputerView createGraphComputerView(final TinkerGraph graph, final GraphFilter graphFilter, final Set<VertexComputeKey> computeKeys) {
         return graph.graphComputerView = new TinkerGraphComputerView(graph, graphFilter, computeKeys);
     }
 


[2/2] incubator-tinkerpop git commit: Migrated over to the proposed Memory model of using registered BinaryOperator reducers. It was really easy to change so thats good. All test cases pass for TinkerGraphComputer, one fails in SparkGraphComputer, and I

Posted by ok...@apache.org.
Migrated over to the proposed Memory model of using registered BinaryOperator reducers. It was really easy to change so thats good. All test cases pass for TinkerGraphComputer, one fails in SparkGraphComputer, and I have some NullPointer serialiation issue with GiraphGraphComputer that I will fix later.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b50a43ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b50a43ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b50a43ce

Branch: refs/heads/TINKERPOP-1166
Commit: b50a43ce781572a1610fa3e31b5132205796af67
Parents: 12a6917
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 19 15:48:50 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 19 15:48:50 2016 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   |  11 +-
 .../giraph/process/computer/GiraphMemory.java   | 103 ++++++------
 .../process/computer/MemoryAggregator.java      |  76 ++++-----
 .../gremlin/process/computer/Memory.java        |  31 +---
 .../process/computer/MemoryComputeKey.java      | 160 +++++++++++++++++++
 .../process/computer/VertexComputeKey.java      |  56 +++++++
 .../gremlin/process/computer/VertexProgram.java |   4 +-
 .../bulkloading/BulkLoaderVertexProgram.java    |   7 +-
 .../peerpressure/PeerPressureVertexProgram.java |  18 ++-
 .../ranking/pagerank/PageRankVertexProgram.java |   9 +-
 .../traversal/TraversalVertexProgram.java       |  14 +-
 .../process/computer/util/ComputerGraph.java    |   6 +-
 .../process/computer/util/ImmutableMemory.java  |  12 +-
 .../process/computer/util/MapMemory.java        |  40 ++---
 .../computer/util/VertexProgramHelper.java      |  15 ++
 .../process/computer/GraphComputerTest.java     |  97 +++++------
 .../computer/util/ComputerSubmissionHelper.java |   2 +-
 .../hadoop/process/computer/util/Rule.java      | 119 --------------
 .../process/computer/MemoryAccumulator.java     |  50 ++++++
 .../spark/process/computer/RuleAccumulator.java |  55 -------
 .../spark/process/computer/SparkExecutor.java   |   5 +-
 .../process/computer/SparkGraphComputer.java    |   3 +-
 .../spark/process/computer/SparkMemory.java     |  61 +++----
 .../process/computer/TinkerGraphComputer.java   |   3 +-
 .../computer/TinkerGraphComputerView.java       |  15 +-
 .../process/computer/TinkerMemory.java          |  39 ++---
 .../tinkergraph/structure/TinkerHelper.java     |   3 +-
 27 files changed, 516 insertions(+), 498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index f031137..83eb436 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -52,6 +52,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
@@ -197,13 +198,13 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                 if (!job.run(true))
                     throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs");  // how do I get the exception that occured?
                 // add vertex program memory values to the return memory
-                for (final String memoryKey : this.vertexProgram.getMemoryComputeKeys()) {
-                    if (storage.exists(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey))) {
-                        final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey)));
+                for (final MemoryComputeKey memoryKey : this.vertexProgram.getMemoryComputeKeys()) {
+                    if (storage.exists(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey()))) {
+                        final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey())));
                         if (iterator.hasNext()) {
-                            this.memory.set(memoryKey, iterator.next().getValue());
+                            this.memory.set(memoryKey.getKey(), iterator.next().getValue());
                         }
-                        storage.rm(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey));
+                        storage.rm(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey()));
                     }
                 }
                 final Path path = new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), Constants.HIDDEN_ITERATION));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
index b0cd3f9..50c0058 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
@@ -24,22 +24,26 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.util.HashSet;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -48,7 +52,7 @@ public final class GiraphMemory extends MasterCompute implements Memory {
 
     private VertexProgram<?> vertexProgram;
     private GiraphWorkerContext worker;
-    private Set<String> memoryKeys;
+    private Map<String, MemoryComputeKey> memoryKeys;
     private boolean isMasterCompute = true;
     private long startTime = System.currentTimeMillis();
 
@@ -59,7 +63,8 @@ public final class GiraphMemory extends MasterCompute implements Memory {
     public GiraphMemory(final GiraphWorkerContext worker, final VertexProgram<?> vertexProgram) {
         this.worker = worker;
         this.vertexProgram = vertexProgram;
-        this.memoryKeys = new HashSet<>(this.vertexProgram.getMemoryComputeKeys());
+        this.memoryKeys = new HashMap<>();
+        this.vertexProgram.getMemoryComputeKeys().forEach(key -> this.memoryKeys.put(key.getKey(), key));
         this.isMasterCompute = false;
     }
 
@@ -76,11 +81,12 @@ public final class GiraphMemory extends MasterCompute implements Memory {
         if (0 == this.getSuperstep()) { // setup
             final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getConf());
             this.vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
-            this.memoryKeys = new HashSet<>(this.vertexProgram.getMemoryComputeKeys());
+            this.memoryKeys = new HashMap<>();
+            this.vertexProgram.getMemoryComputeKeys().forEach(key -> this.memoryKeys.put(key.getKey(), key));
             try {
-                for (final String key : this.memoryKeys) {
-                    MemoryHelper.validateKey(key);
-                    this.registerPersistentAggregator(key, MemoryAggregator.class);
+                for (final MemoryComputeKey key : this.memoryKeys.values()) {
+                    this.registerPersistentAggregator(key.getKey(), MemoryAggregator.class);
+                    this.setAggregatedValue(key.getKey(), new ObjectWritable(key));
                 }
             } catch (final Exception e) {
                 throw new IllegalStateException(e.getMessage(), e);
@@ -129,80 +135,71 @@ public final class GiraphMemory extends MasterCompute implements Memory {
 
     @Override
     public Set<String> keys() {
-        return this.memoryKeys.stream().filter(this::exists).collect(Collectors.toSet());
+        return this.memoryKeys.keySet();
     }
 
     @Override
     public boolean exists(final String key) {
-        final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
-        return null != rule.getObject();
+        final Object value = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+        return null != value;
     }
 
     @Override
     public <R> R get(final String key) throws IllegalArgumentException {
         //this.checkKey(key);
-        final Rule rule = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
-        if (null == rule.getObject())
+        final Object value = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+        if (null == value)
             throw Memory.Exceptions.memoryDoesNotExist(key);
         else
-            return rule.getObject();
+            return (R) value;
     }
 
     @Override
-    public void set(final String key, Object value) {
+    public void set(final String key, final Object value) {
         this.checkKeyValue(key, value);
         if (this.isMasterCompute)
-            this.setAggregatedValue(key, new Rule(Rule.Operation.SET, value));
+            this.setAggregatedValue(key, new ObjectWritable<>(value));
         else
-            this.worker.aggregate(key, new Rule(Rule.Operation.SET, value));
+            this.worker.aggregate(key, new ObjectWritable<>(value));
     }
 
     @Override
-    public void and(final String key, final boolean bool) {
-        this.checkKeyValue(key, bool);
+    public void add(final String key, final Object value) {
+        this.checkKeyValue(key, value);
         if (this.isMasterCompute) {  // only called on setup() and terminate()
-            Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
-            value = null == value ? bool : bool && value;
-            this.setAggregatedValue(key, new Rule(Rule.Operation.AND, value));
-        } else {
-            this.worker.aggregate(key, new Rule(Rule.Operation.AND, bool));
-        }
-    }
-
-    @Override
-    public void or(final String key, final boolean bool) {
-        this.checkKeyValue(key, bool);
-        if (this.isMasterCompute) {   // only called on setup() and terminate()
-            Boolean value = this.<Rule>getAggregatedValue(key).<Boolean>getObject();
-            value = null == value ? bool : bool || value;
-            this.setAggregatedValue(key, new Rule(Rule.Operation.OR, value));
-        } else {
-            this.worker.aggregate(key, new Rule(Rule.Operation.OR, bool));
-        }
-    }
-
-    @Override
-    public void incr(final String key, final long delta) {
-        this.checkKeyValue(key, delta);
-        if (this.isMasterCompute) {   // only called on setup() and terminate()
-            Number value = this.<Rule>getAggregatedValue(key).<Number>getObject();
-            value = null == value ? delta : value.longValue() + delta;
-            this.setAggregatedValue(key, new Rule(Rule.Operation.INCR, value));
+            this.setAggregatedValue(key, new ObjectWritable<>(value));
         } else {
-            this.worker.aggregate(key, new Rule(Rule.Operation.INCR, delta));
+            this.worker.aggregate(key, new ObjectWritable<>(value));
         }
     }
 
     @Override
     public void write(final DataOutput output) {
-        // no need to serialize the master compute as it gets its data from aggregators
-        // is this true?
+        try {
+            final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+            final ObjectOutputStream objects = new ObjectOutputStream(bytes);
+            output.writeInt(bytes.size());
+            objects.writeObject(this.memoryKeys);
+            objects.flush();
+            output.write(bytes.toByteArray());
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
     }
 
     @Override
     public void readFields(final DataInput input) {
-        // no need to serialize the master compute as it gets its data from aggregators
-        // is this true?
+        try {
+            final byte[] in = new byte[input.readInt()];
+            for (int i = 0; i < in.length; i++) {
+                in[i] = input.readByte();
+            }
+            final ByteArrayInputStream bytes = new ByteArrayInputStream(in);
+            final ObjectInputStream objects = new ObjectInputStream(bytes);
+            this.memoryKeys = (Map<String, MemoryComputeKey>) objects.readObject();
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
     }
 
     @Override
@@ -211,7 +208,7 @@ public final class GiraphMemory extends MasterCompute implements Memory {
     }
 
     private void checkKeyValue(final String key, final Object value) {
-        if (!this.memoryKeys.contains(key))
+        if (!this.memoryKeys.containsKey(key))
             throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
         MemoryHelper.validateValue(value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
index 1e56892..4180d2f 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/MemoryAggregator.java
@@ -19,76 +19,56 @@
 package org.apache.tinkerpop.gremlin.giraph.process.computer;
 
 import org.apache.giraph.aggregators.Aggregator;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MemoryAggregator implements Aggregator<Rule> {
+public final class MemoryAggregator implements Aggregator<ObjectWritable> {
 
-    private Object currentObject;
-    private Rule.Operation lastOperation = null;
+    private ObjectWritable currentObject;
+    private MemoryComputeKey memoryComputeKey;
 
-    public MemoryAggregator() {
-        this.currentObject = null;
+    public MemoryAggregator() { // for Giraph serialization
+
+    }
+
+    public MemoryAggregator(final MemoryComputeKey memoryComputeKey) {
+        this.currentObject = ObjectWritable.empty();
+        this.memoryComputeKey = memoryComputeKey;
     }
 
     @Override
-    public Rule getAggregatedValue() {
-        if (null == this.currentObject)
-            return createInitialValue();
-        else if (this.currentObject instanceof Long)
-            return new Rule(Rule.Operation.INCR, this.currentObject);
-        else
-            return new Rule(null == this.lastOperation ? Rule.Operation.NO_OP : this.lastOperation, this.currentObject);
+    public ObjectWritable getAggregatedValue() {
+        return this.currentObject;
     }
 
     @Override
-    public void setAggregatedValue(final Rule rule) {
-        this.currentObject = null == rule ? null : rule.getObject();
+    public void setAggregatedValue(final ObjectWritable object) {
+        if (null == object)
+            this.currentObject = ObjectWritable.empty();
+        else if (object.get() instanceof MemoryComputeKey)
+            this.memoryComputeKey = (MemoryComputeKey) object.get();
+        else
+            this.currentObject = object;
     }
 
     @Override
     public void reset() {
-        this.currentObject = null;
+        this.currentObject = ObjectWritable.empty();
     }
 
     @Override
-    public Rule createInitialValue() {
-        return new Rule(Rule.Operation.NO_OP, null);
+    public ObjectWritable createInitialValue() {
+        return ObjectWritable.empty();
     }
 
     @Override
-    public void aggregate(final Rule ruleWritable) {
-        final Rule.Operation rule = ruleWritable.getOperation();
-        final Object object = ruleWritable.getObject();
-        if (rule != Rule.Operation.NO_OP)
-            this.lastOperation = rule;
-
-        if (null == this.currentObject || rule.equals(Rule.Operation.SET)) {
+    public void aggregate(final ObjectWritable object) {
+        if (this.currentObject.isEmpty())
             this.currentObject = object;
-        } else {
-            if (rule.equals(Rule.Operation.INCR)) {
-                this.currentObject = (Long) this.currentObject + (Long) object;
-            } else if (rule.equals(Rule.Operation.AND)) {
-                this.currentObject = (Boolean) this.currentObject && (Boolean) object;
-            } else if (rule.equals(Rule.Operation.OR)) {
-                this.currentObject = (Boolean) this.currentObject || (Boolean) object;
-            } else if (rule.equals(Rule.Operation.NO_OP)) {
-                if (object instanceof Boolean) { // only happens when NO_OP booleans are being propagated will this occur
-                    if (null == this.lastOperation) {
-                        // do nothing ... why?
-                    } else if (this.lastOperation.equals(Rule.Operation.AND)) {
-                        this.currentObject = (Boolean) this.currentObject && (Boolean) object;
-                    } else if (this.lastOperation.equals(Rule.Operation.OR)) {
-                        this.currentObject = (Boolean) this.currentObject || (Boolean) object;
-                    } else {
-                        throw new IllegalStateException("This state should not have occurred: " + ruleWritable);
-                    }
-                }
-            } else {
-                throw new IllegalArgumentException("The provided rule is unknown: " + ruleWritable);
-            }
-        }
+        else
+            this.currentObject.set(this.memoryComputeKey.getReducer().apply(this.currentObject.get(), object.get()));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
index abae106..16dbb78 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Memory.java
@@ -62,6 +62,8 @@ public interface Memory {
      */
     public <R> R get(final String key) throws IllegalArgumentException;
 
+    public void set(final String key, final Object value) throws IllegalArgumentException, IllegalStateException;
+
     /**
      * Set the value of the provided key. This is typically called in setup() and/or terminate() of the {@link VertexProgram}.
      * If this is called during execute(), there is no guarantee as to the ultimately stored value as call order is indeterminate.
@@ -69,7 +71,7 @@ public interface Memory {
      * @param key   they key to set a value for
      * @param value the value to set for the key
      */
-    public void set(final String key, final Object value);
+    public void add(final String key, final Object value);
 
     /**
      * A helper method that generates a {@link Map} of the memory key/values.
@@ -78,9 +80,8 @@ public interface Memory {
      */
     public default Map<String, Object> asMap() {
         final Map<String, Object> map = keys().stream()
-                .filter(this::exists)
                 .map(key -> Pair.with(key, get(key)))
-                .collect(Collectors.toMap(kv -> kv.getValue0(), Pair::getValue1));
+                .collect(Collectors.toMap(Pair::getValue0, Pair::getValue1));
         return Collections.unmodifiableMap(map);
     }
 
@@ -99,30 +100,6 @@ public interface Memory {
     public long getRuntime();
 
     /**
-     * Add the provided delta value to the long value currently stored at the key.
-     *
-     * @param key   the key of the long value
-     * @param delta the adjusting amount (can be negative for decrement)
-     */
-    public void incr(final String key, final long delta);
-
-    /**
-     * Logically AND the provided boolean value with the boolean value currently stored at the key.
-     *
-     * @param key  the key of the boolean value
-     * @param bool the boolean to AND
-     */
-    public void and(final String key, final boolean bool);
-
-    /**
-     * Logically OR the provided boolean value with the boolean value currently stored at the key.
-     *
-     * @param key  the key of the boolean value
-     * @param bool the boolean to OR
-     */
-    public void or(final String key, final boolean bool);
-
-    /**
      * A helper method that states whether the current iteration is 0.
      *
      * @return whether this is the first iteration

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
new file mode 100644
index 0000000..71499f0
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
+
+import java.io.Serializable;
+import java.util.function.BinaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MemoryComputeKey<A> implements Serializable {
+
+    private final String key;
+    private final BinaryOperator<A> reducer;
+    private final boolean isTransient;
+
+    private MemoryComputeKey(final String key, final BinaryOperator<A> reducer, final boolean isTransient) {
+        this.key = key;
+        this.reducer = reducer;
+        this.isTransient = isTransient;
+        MemoryHelper.validateKey(key);
+    }
+
+    public String getKey() {
+        return this.key;
+    }
+
+    public boolean isTransient() {
+        return this.isTransient;
+    }
+
+    public BinaryOperator<A> getReducer() {
+        return this.reducer;
+    }
+
+    @Override
+    public int hashCode() {
+        return this.key.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object object) {
+        return object instanceof MemoryComputeKey && ((MemoryComputeKey) object).key.equals(this.key);
+    }
+
+    public static <A> MemoryComputeKey of(final String key, final BinaryOperator<A> reducer, final boolean isTransient) {
+        return new MemoryComputeKey<>(key, reducer, isTransient);
+    }
+
+    public static SetOperator setOperator() {
+        return SetOperator.INSTANCE;
+    }
+
+    public static AndOperator andOperator() {
+        return AndOperator.INSTANCE;
+    }
+
+    public static OrOperator orOperator() {
+        return OrOperator.INSTANCE;
+    }
+
+    public static SumLongOperator sumLongOperator() {
+        return SumLongOperator.INSTANCE;
+    }
+
+    public static SumIntegerOperator sumIntegerOperator() {
+        return SumIntegerOperator.INSTANCE;
+    }
+
+    ///////////
+
+    public static class SetOperator implements BinaryOperator<Object>, Serializable {
+
+        private static final SetOperator INSTANCE = new SetOperator();
+
+        private SetOperator() {
+
+        }
+
+        @Override
+        public Object apply(final Object first, final Object second) {
+            return second;
+        }
+    }
+
+    public static class AndOperator implements BinaryOperator<Boolean>, Serializable {
+
+        private static final AndOperator INSTANCE = new AndOperator();
+
+        private AndOperator() {
+
+        }
+
+        @Override
+        public Boolean apply(final Boolean first, final Boolean second) {
+            return first && second;
+        }
+    }
+
+    public static class OrOperator implements BinaryOperator<Boolean>, Serializable {
+
+        private static final OrOperator INSTANCE = new OrOperator();
+
+        private OrOperator() {
+
+        }
+
+        @Override
+        public Boolean apply(final Boolean first, final Boolean second) {
+            return first || second;
+        }
+    }
+
+    public static class SumLongOperator implements BinaryOperator<Long>, Serializable {
+
+        private static final SumLongOperator INSTANCE = new SumLongOperator();
+
+        private SumLongOperator() {
+
+        }
+
+        @Override
+        public Long apply(final Long first, final Long second) {
+            return first + second;
+        }
+    }
+
+    public static class SumIntegerOperator implements BinaryOperator<Integer>, Serializable {
+
+        private static final SumIntegerOperator INSTANCE = new SumIntegerOperator();
+
+        private SumIntegerOperator() {
+
+        }
+
+        @Override
+        public Integer apply(final Integer first, final Integer second) {
+            return first + second;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexComputeKey.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexComputeKey.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexComputeKey.java
new file mode 100644
index 0000000..3b106b7
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexComputeKey.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VertexComputeKey {
+
+    private final String key;
+    private final boolean isTransient;
+
+    private VertexComputeKey(final String key, final boolean isTransient) {
+        this.key = key;
+        this.isTransient = isTransient;
+    }
+
+    public String getKey() {
+        return this.key;
+    }
+
+    public boolean isTransient() {
+        return this.isTransient;
+    }
+
+    @Override
+    public int hashCode() {
+        return this.key.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object object) {
+        return object instanceof VertexComputeKey && ((VertexComputeKey) object).key.equals(this.key);
+    }
+
+    public static VertexComputeKey of(final String key, final boolean isTransient) {
+        return new VertexComputeKey(key, isTransient);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
index 37ff8e7..15243fa 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
@@ -133,7 +133,7 @@ public interface VertexProgram<M> extends Cloneable {
      *
      * @return the set of element keys that will be mutated during the vertex program's execution
      */
-    public default Set<String> getElementComputeKeys() {
+    public default Set<VertexComputeKey> getVertexComputeKeys() {
         return Collections.emptySet();
     }
 
@@ -144,7 +144,7 @@ public interface VertexProgram<M> extends Cloneable {
      *
      * @return the set of memory keys that will be read/written
      */
-    public default Set<String> getMemoryComputeKeys() {
+    public default Set<MemoryComputeKey> getMemoryComputeKeys() {
         return Collections.emptySet();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
index 6853531..a6d0f5b 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgram.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -70,7 +71,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
     public static final String DEFAULT_BULK_LOADER_VERTEX_ID = "bulkLoader.vertex.id";
 
     private final MessageScope messageScope;
-    private final Set<String> elementComputeKeys;
+    private final Set<VertexComputeKey> elementComputeKeys;
     private Configuration configuration;
     private BulkLoader bulkLoader;
     private Graph graph;
@@ -152,7 +153,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
             ConfigurationUtils.copy(config, configuration);
         }
         intermediateBatchSize = configuration.getLong(INTERMEDIATE_BATCH_SIZE_CFG_KEY, 0L);
-        elementComputeKeys.add(DEFAULT_BULK_LOADER_VERTEX_ID);
+        elementComputeKeys.add(VertexComputeKey.of(DEFAULT_BULK_LOADER_VERTEX_ID, true));
         bulkLoader = createBulkLoader();
     }
 
@@ -272,7 +273,7 @@ public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
     }
 
     @Override
-    public Set<String> getElementComputeKeys() {
+    public Set<VertexComputeKey> getVertexComputeKeys() {
         return elementComputeKeys;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
index 6c0034d..de4fdb9 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
@@ -21,8 +21,10 @@ package org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure;
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
 import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -71,7 +73,7 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
     private boolean distributeVote = false;
     private String property = CLUSTER;
 
-    private static final Set<String> MEMORY_COMPUTE_KEYS = new HashSet<>(Collections.singletonList(VOTE_TO_HALT));
+    private static final Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = Collections.singleton(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true));
 
     private PeerPressureVertexProgram() {
 
@@ -100,12 +102,12 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
     }
 
     @Override
-    public Set<String> getElementComputeKeys() {
-        return new HashSet<>(Arrays.asList(this.property, VOTE_STRENGTH));
+    public Set<VertexComputeKey> getVertexComputeKeys() {
+        return new HashSet<>(Arrays.asList(VertexComputeKey.of(this.property, false), VertexComputeKey.of(VOTE_STRENGTH, true)));
     }
 
     @Override
-    public Set<String> getMemoryComputeKeys() {
+    public Set<MemoryComputeKey> getMemoryComputeKeys() {
         return MEMORY_COMPUTE_KEYS;
     }
 
@@ -139,21 +141,21 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
                 vertex.property(VertexProperty.Cardinality.single, this.property, vertex.id());
                 vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
                 messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
-                memory.and(VOTE_TO_HALT, false);
+                memory.add(VOTE_TO_HALT, false);
             }
         } else if (1 == memory.getIteration() && this.distributeVote) {
             double voteStrength = 1.0d / IteratorUtils.reduce(IteratorUtils.map(messenger.receiveMessages(), Pair::getValue1), 0.0d, (a, b) -> a + b);
             vertex.property(VertexProperty.Cardinality.single, this.property, vertex.id());
             vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
             messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
-            memory.and(VOTE_TO_HALT, false);
+            memory.add(VOTE_TO_HALT, false);
         } else {
             final Map<Serializable, Double> votes = new HashMap<>();
             votes.put(vertex.value(this.property), vertex.<Double>value(VOTE_STRENGTH));
             messenger.receiveMessages().forEachRemaining(message -> MapHelper.incr(votes, message.getValue0(), message.getValue1()));
             Serializable cluster = PeerPressureVertexProgram.largestCount(votes);
             if (null == cluster) cluster = (Serializable) vertex.id();
-            memory.and(VOTE_TO_HALT, vertex.value(this.property).equals(cluster));
+            memory.add(VOTE_TO_HALT, vertex.value(this.property).equals(cluster));
             vertex.property(VertexProperty.Cardinality.single, this.property, cluster);
             messenger.sendMessage(this.voteScope, new Pair<>(cluster, vertex.<Double>value(VOTE_STRENGTH)));
         }
@@ -165,7 +167,7 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
         if (voteToHalt) {
             return true;
         } else {
-            memory.or(VOTE_TO_HALT, true);
+            memory.set(VOTE_TO_HALT, true);
             return false;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
index 2092e65..bb642e2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ranking/pagerank/PageRankVertexProgram.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -68,7 +69,7 @@ public class PageRankVertexProgram implements VertexProgram<Double> {
     private double alpha = 0.85d;
     private int totalIterations = 30;
     private String property = PAGE_RANK;
-    private Set<String> computeKeys;
+    private Set<VertexComputeKey> vertexComputeKeys;
 
     private PageRankVertexProgram() {
 
@@ -87,7 +88,7 @@ public class PageRankVertexProgram implements VertexProgram<Double> {
         this.alpha = configuration.getDouble(ALPHA, 0.85d);
         this.totalIterations = configuration.getInt(TOTAL_ITERATIONS, 30);
         this.property = configuration.getString(PROPERTY, PAGE_RANK);
-        this.computeKeys = new HashSet<>(Arrays.asList(this.property, EDGE_COUNT));
+        this.vertexComputeKeys = new HashSet<>(Arrays.asList(VertexComputeKey.of(this.property, false), VertexComputeKey.of(EDGE_COUNT, true)));
     }
 
     @Override
@@ -114,8 +115,8 @@ public class PageRankVertexProgram implements VertexProgram<Double> {
     }
 
     @Override
-    public Set<String> getElementComputeKeys() {
-        return this.computeKeys;
+    public Set<VertexComputeKey> getVertexComputeKeys() {
+        return this.vertexComputeKeys;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index f5bf229..29ff45e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -22,9 +22,11 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.sideEffect.mapreduce.TraverserMapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
@@ -78,8 +80,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
 
     // TODO: if not an adjacent traversal, use Local message scope -- a dual messaging system.
     private static final Set<MessageScope> MESSAGE_SCOPES = new HashSet<>(Collections.singletonList(MessageScope.Global.instance()));
-    private static final Set<String> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(HALTED_TRAVERSERS, TraversalSideEffects.SIDE_EFFECTS));
-    private static final Set<String> MEMORY_COMPUTE_KEYS = new HashSet<>(Collections.singletonList(VOTE_TO_HALT));
+    private static final Set<VertexComputeKey> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(VertexComputeKey.of(HALTED_TRAVERSERS, false), VertexComputeKey.of(TraversalSideEffects.SIDE_EFFECTS, true)));
+    private static final Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = new HashSet<>(Collections.singletonList(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true)));
 
     private PureTraversal<?, ?> traversal;
     private TraversalMatrix<?, ?> traversalMatrix;
@@ -160,9 +162,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                         aliveTraverses.add((Traverser.Admin) traverser);
                 });
             }
-            memory.and(VOTE_TO_HALT, aliveTraverses.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, aliveTraverses), this.traversalMatrix));
+            memory.add(VOTE_TO_HALT, aliveTraverses.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, aliveTraverses), this.traversalMatrix));
         } else {  // ITERATION 1+
-            memory.and(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix));
+            memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix));
         }
     }
 
@@ -178,12 +180,12 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     }
 
     @Override
-    public Set<String> getElementComputeKeys() {
+    public Set<VertexComputeKey> getVertexComputeKeys() {
         return ELEMENT_COMPUTE_KEYS;
     }
 
     @Override
-    public Set<String> getMemoryComputeKeys() {
+    public Set<MemoryComputeKey> getMemoryComputeKeys() {
         return MEMORY_COMPUTE_KEYS;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ComputerGraph.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ComputerGraph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ComputerGraph.java
index 8c818f1..31438c3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ComputerGraph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ComputerGraph.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.process.computer.util;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -29,7 +30,6 @@ import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Transaction;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.structure.util.wrapped.WrappedEdge;
 import org.apache.tinkerpop.gremlin.structure.util.wrapped.WrappedElement;
 import org.apache.tinkerpop.gremlin.structure.util.wrapped.WrappedProperty;
@@ -55,9 +55,9 @@ public final class ComputerGraph implements Graph {
     private final Set<String> computeKeys;
     private State state;
 
-    private ComputerGraph(final State state, final Vertex starVertex, final Optional<VertexProgram> vertexProgram) {
+    private ComputerGraph(final State state, final Vertex starVertex, final Optional<VertexProgram<?>> vertexProgram) {
         this.state = state;
-        this.computeKeys = vertexProgram.isPresent() ? vertexProgram.get().getElementComputeKeys() : Collections.emptySet();
+        this.computeKeys = vertexProgram.isPresent() ? vertexProgram.get().getVertexComputeKeys().stream().map(VertexComputeKey::getKey).collect(Collectors.toSet()) : Collections.emptySet();
         this.starVertex = new ComputerVertex(starVertex);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ImmutableMemory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ImmutableMemory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ImmutableMemory.java
index 1c2bb97..45107b5 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ImmutableMemory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/ImmutableMemory.java
@@ -60,17 +60,7 @@ public final class ImmutableMemory implements Memory.Admin {
     }
 
     @Override
-    public void incr(final String key, final long delta) {
-        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
-    }
-
-    @Override
-    public void and(final String key, final boolean bool) {
-        throw Memory.Exceptions.memoryIsCurrentlyImmutable();
-    }
-
-    @Override
-    public void or(final String key, final boolean bool) {
+    public void add(final String key, final Object value) {
         throw Memory.Exceptions.memoryIsCurrentlyImmutable();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
index 71bd185..d63094a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
@@ -21,12 +21,12 @@ package org.apache.tinkerpop.gremlin.process.computer.util;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -38,7 +38,7 @@ public final class MapMemory implements Memory.Admin, Serializable {
     private long runtime = 0l;
     private int iteration = -1;
     private final Map<String, Object> memoryMap = new HashMap<>();
-    private final Set<String> memoryComputeKeys = new HashSet<>();
+    private final Map<String, MemoryComputeKey> memoryComputeKeys = new HashMap<>();
 
     public MapMemory() {
 
@@ -52,11 +52,11 @@ public final class MapMemory implements Memory.Admin, Serializable {
     }
 
     public void addVertexProgramMemoryComputeKeys(final VertexProgram<?> vertexProgram) {
-        this.memoryComputeKeys.addAll(vertexProgram.getMemoryComputeKeys());
+        vertexProgram.getMemoryComputeKeys().forEach(key -> this.memoryComputeKeys.put(key.getKey(), key));
     }
 
     public void addMapReduceMemoryKey(final MapReduce mapReduce) {
-        this.memoryComputeKeys.add(mapReduce.getMemoryKey());
+        //this.memoryComputeKeys.add(mapReduce.getMemoryKey());
     }
 
     @Override
@@ -89,35 +89,13 @@ public final class MapMemory implements Memory.Admin, Serializable {
     }
 
     @Override
-    public void incr(final String key, final long delta) {
-        this.checkKeyValue(key, delta);
+    public void add(final String key, final Object value) {
+        this.checkKeyValue(key, value);
         if (this.memoryMap.containsKey(key)) {
-            final long newValue = (long) this.memoryMap.get(key) + delta;
+            final Object newValue = this.memoryComputeKeys.get(key).getReducer().apply(this.memoryMap.get(key), value);
             this.memoryMap.put(key, newValue);
         } else {
-            this.memoryMap.put(key, delta);
-        }
-    }
-
-    @Override
-    public void and(final String key, final boolean bool) {
-        this.checkKeyValue(key, bool);
-        if (this.memoryMap.containsKey(key)) {
-            final boolean newValue = (boolean) this.memoryMap.get(key) && bool;
-            this.memoryMap.put(key, newValue);
-        } else {
-            this.memoryMap.put(key, bool);
-        }
-    }
-
-    @Override
-    public void or(final String key, final boolean bool) {
-        this.checkKeyValue(key, bool);
-        if (this.memoryMap.containsKey(key)) {
-            final boolean newValue = (boolean) this.memoryMap.get(key) || bool;
-            this.memoryMap.put(key, newValue);
-        } else {
-            this.memoryMap.put(key, bool);
+            this.memoryMap.put(key, value);
         }
     }
 
@@ -142,7 +120,7 @@ public final class MapMemory implements Memory.Admin, Serializable {
     }
 
     private final void checkKeyValue(final String key, final Object value) {
-        if (!this.memoryComputeKeys.contains(key))
+        if (!this.memoryComputeKeys.containsKey(key))
             throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
         MemoryHelper.validateValue(value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
index 3f51ba6..a0b0c25 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.process.computer.util;
 
 import org.apache.commons.configuration.AbstractConfiguration;
 import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep;
@@ -28,6 +29,8 @@ import org.apache.tinkerpop.gremlin.util.Serializer;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -37,6 +40,18 @@ public final class VertexProgramHelper {
     private VertexProgramHelper() {
     }
 
+    public static Set<String> vertexComputeKeysAsSet(final Set<VertexComputeKey> vertexComputeKeySet) {
+        final Set<String> set = new HashSet<>(vertexComputeKeySet.size());
+        for (final VertexComputeKey key : vertexComputeKeySet) {
+            set.add(key.getKey());
+        }
+        return set;
+    }
+
+    public static String[] vertexComputeKeysAsArray(final Set<VertexComputeKey> vertexComputeKeySet) {
+        return VertexProgramHelper.vertexComputeKeysAsSet(vertexComputeKeySet).toArray(new String[vertexComputeKeySet.size()]);
+    }
+
     public static void serialize(final Object object, final Configuration configuration, final String key) {
         if (configuration instanceof AbstractConfiguration)
             ((AbstractConfiguration) configuration).setDelimiterParsingDisabled(true);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 254ac31..d33d80d 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -103,6 +103,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
     }
 
+
     /////////////////////////////////////////////
     @Test
     @LoadGraphWith(MODERN)
@@ -178,19 +179,19 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         try {
-            results.memory().incr("incr", 1);
+            results.memory().add("incr", 1);
         } catch (Exception ex) {
             validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
         }
 
         try {
-            results.memory().and("and", true);
+            results.memory().add("and", true);
         } catch (Exception ex) {
             validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
         }
 
         try {
-            results.memory().or("or", false);
+            results.memory().add("or", false);
         } catch (Exception ex) {
             validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
         }
@@ -217,8 +218,12 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getMemoryComputeKeys() {
-            return new HashSet<>(Arrays.asList("set", "incr", "and", "or"));
+        public Set<MemoryComputeKey> getMemoryComputeKeys() {
+            return new HashSet<>(Arrays.asList(
+                    MemoryComputeKey.of("set", MemoryComputeKey.setOperator(), false),
+                    MemoryComputeKey.of("incr", MemoryComputeKey.sumLongOperator(), false),
+                    MemoryComputeKey.of("and", MemoryComputeKey.andOperator(), false),
+                    MemoryComputeKey.of("or", MemoryComputeKey.orOperator(), false)));
         }
 
         @Override
@@ -266,8 +271,8 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getMemoryComputeKeys() {
-            return new HashSet<>(Arrays.asList(null));
+        public Set<MemoryComputeKey> getMemoryComputeKeys() {
+            return Collections.singleton(null);
         }
 
         @Override
@@ -316,8 +321,8 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getMemoryComputeKeys() {
-            return new HashSet<>(Arrays.asList(""));
+        public Set<MemoryComputeKey> getMemoryComputeKeys() {
+            return Collections.singleton(MemoryComputeKey.of("", MemoryComputeKey.orOperator(), false));
         }
 
         @Override
@@ -456,8 +461,8 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         assertTrue(results.memory().keys().contains("b"));
         assertTrue(results.memory().getRuntime() >= 0);
 
-        assertEquals(Long.valueOf(12), results.memory().<Long>get("a"));   // 2 iterations
-        assertEquals(Long.valueOf(28), results.memory().<Long>get("b"));
+        assertEquals(12, results.memory().<Integer>get("a").intValue());   // 2 iterations
+        assertEquals(28, results.memory().<Integer>get("b").intValue());
         try {
             results.memory().get("BAD");
             fail("Should throw an IllegalArgumentException");
@@ -490,10 +495,10 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
                 fail("Should throw an IllegalArgumentException: " + e);
             }
 
-            memory.incr("a", 1);
+            memory.add("a", 1);
             if (memory.isInitialIteration()) {
                 vertex.property(VertexProperty.Cardinality.single, "nameLengthCounter", vertex.<String>value("name").length());
-                memory.incr("b", vertex.<String>value("name").length());
+                memory.add("b", vertex.<String>value("name").length());
             } else {
                 vertex.property(VertexProperty.Cardinality.single, "nameLengthCounter", vertex.<String>value("name").length() + vertex.<Integer>value("nameLengthCounter"));
             }
@@ -505,13 +510,15 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getElementComputeKeys() {
-            return new HashSet<>(Arrays.asList("nameLengthCounter"));
+        public Set<VertexComputeKey> getVertexComputeKeys() {
+            return Collections.singleton(VertexComputeKey.of("nameLengthCounter", false));
         }
 
         @Override
-        public Set<String> getMemoryComputeKeys() {
-            return new HashSet<>(Arrays.asList("a", "b"));
+        public Set<MemoryComputeKey> getMemoryComputeKeys() {
+            return new HashSet<>(Arrays.asList(
+                    MemoryComputeKey.of("a", MemoryComputeKey.sumIntegerOperator(), false),
+                    MemoryComputeKey.of("b", MemoryComputeKey.sumIntegerOperator(), false)));
         }
 
         @Override
@@ -582,12 +589,12 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
             assertEquals(memory.getIteration(), memory.<Integer>get("f").intValue());
 
             // update current step values
-            memory.incr("a", 1l);
-            memory.incr("b", 1l);
-            memory.and("c", false);
-            memory.or("d", true);
-            memory.and("e", false);
-            memory.set("f", memory.getIteration() + 1);
+            memory.add("a", 1l);
+            memory.add("b", 1l);
+            memory.add("c", false);
+            memory.add("d", true);
+            memory.add("e", false);
+            memory.add("f", memory.getIteration() + 1);
 
             // test current step values, should be the same as previous prior to update
             assertEquals(Long.valueOf(6 * memory.getIteration()), memory.get("a"));
@@ -617,13 +624,14 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getElementComputeKeys() {
-            return Collections.emptySet();
-        }
-
-        @Override
-        public Set<String> getMemoryComputeKeys() {
-            return new HashSet<>(Arrays.asList("a", "b", "c", "d", "e", "f"));
+        public Set<MemoryComputeKey> getMemoryComputeKeys() {
+            return new HashSet<>(Arrays.asList(
+                    MemoryComputeKey.of("a", MemoryComputeKey.sumLongOperator(), false),
+                    MemoryComputeKey.of("b", MemoryComputeKey.sumLongOperator(), false),
+                    MemoryComputeKey.of("c", MemoryComputeKey.andOperator(), false),
+                    MemoryComputeKey.of("d", MemoryComputeKey.orOperator(), false),
+                    MemoryComputeKey.of("e", MemoryComputeKey.andOperator(), false),
+                    MemoryComputeKey.of("f", MemoryComputeKey.setOperator(), false)));
         }
 
         @Override
@@ -713,13 +721,8 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getElementComputeKeys() {
-            return new HashSet<>(Arrays.asList("counter"));
-        }
-
-        @Override
-        public Set<String> getMemoryComputeKeys() {
-            return Collections.emptySet();
+        public Set<VertexComputeKey> getVertexComputeKeys() {
+            return Collections.singleton(VertexComputeKey.of("counter", false));
         }
 
         @Override
@@ -1131,8 +1134,8 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getMemoryComputeKeys() {
-            return new HashSet<>(Arrays.asList("test"));
+        public Set<MemoryComputeKey> getMemoryComputeKeys() {
+            return Collections.singleton(MemoryComputeKey.of("test", MemoryComputeKey.setOperator(), false));
         }
 
         @Override
@@ -1378,8 +1381,8 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getElementComputeKeys() {
-            return Collections.singleton("money");
+        public Set<VertexComputeKey> getVertexComputeKeys() {
+            return Collections.singleton(VertexComputeKey.of("money", false));
         }
 
         @Override
@@ -1438,7 +1441,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
                 throw new IllegalStateException(e.getMessage(), e);
             }
             if (!this.announced) {
-                memory.incr("workerCount", 1l);
+                memory.add("workerCount", 1l);
                 this.announced = true;
             }
         }
@@ -1449,8 +1452,8 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getMemoryComputeKeys() {
-            return new HashSet<>(Arrays.asList("workerCount"));
+        public Set<MemoryComputeKey> getMemoryComputeKeys() {
+            return Collections.singleton(MemoryComputeKey.<Long>of("workerCount", MemoryComputeKey.sumLongOperator(), false));
         }
 
         /*public void workerIterationStart(final Memory memory) {
@@ -1928,8 +1931,8 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
 
         @Override
-        public Set<String> getElementComputeKeys() {
-            return Collections.singleton("age");
+        public Set<VertexComputeKey> getVertexComputeKeys() {
+            return Collections.singleton(VertexComputeKey.of("age", false));
         }
 
         @Override
@@ -1942,6 +1945,4 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
             return GraphComputer.Persist.VERTEX_PROPERTIES;
         }
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/ComputerSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/ComputerSubmissionHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/ComputerSubmissionHelper.java
index daad7ef..2f4625f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/ComputerSubmissionHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/ComputerSubmissionHelper.java
@@ -31,7 +31,7 @@ import com.google.common.base.Function;
 
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 
-public class ComputerSubmissionHelper {
+public final class ComputerSubmissionHelper {
 
     /**
      * Creates a {@link Executors#newSingleThreadExecutor(ThreadFactory)} configured

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
deleted file mode 100644
index e4648c0..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/Rule.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.util;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.tinkerpop.gremlin.util.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class Rule implements Writable, Serializable {
-
-    public enum Operation {
-        OR {
-            public Boolean compute(final Object first, final Object second) {
-                if (null == first)
-                    return (Boolean) second;
-                else if (null == second)
-                    return (Boolean) first;
-                else
-                    return (Boolean) first || (Boolean) second;
-            }
-        }, AND {
-            public Boolean compute(final Object first, final Object second) {
-                if (null == first)
-                    return (Boolean) second;
-                else if (null == second)
-                    return (Boolean) first;
-                else
-                    return (Boolean) first && (Boolean) second;
-            }
-        }, INCR {
-            public Long compute(final Object first, final Object second) {
-                if (null == first)
-                    return (Long) second;
-                else if (null == second)
-                    return (Long) first;
-                else
-                    return (Long) first + (Long) second;
-
-            }
-        }, SET {
-            public Object compute(final Object first, final Object second) {
-                return null == second ? first : second;
-            }
-        }, NO_OP {
-            public Object compute(final Object first, final Object second) {
-                return null == first ? second : first;
-            }
-        };
-
-        public abstract Object compute(final Object first, final Object second);
-    }
-
-    private Operation operation;
-    private Object object;
-
-    public Rule(final Operation operation, final Object object) {
-        this.operation = operation;
-        this.object = object;
-    }
-
-    public Operation getOperation() {
-        return this.operation;
-    }
-
-    public <R> R getObject() {
-        return (R) this.object;
-    }
-
-    public String toString() {
-        return "rule[" + this.operation + ":" + this.object + "]";
-    }
-
-    @Override
-    public void write(final DataOutput output) throws IOException {
-        WritableUtils.writeVInt(output, this.operation.ordinal());
-        final byte[] objectBytes = Serializer.serializeObject(this.object);
-        WritableUtils.writeVInt(output, objectBytes.length);
-        output.write(objectBytes);
-    }
-
-    @Override
-    public void readFields(final DataInput input) throws IOException {
-        this.operation = Operation.values()[WritableUtils.readVInt(input)];
-        final int objectLength = WritableUtils.readVInt(input);
-        final byte[] objectBytes = new byte[objectLength];
-        for (int i = 0; i < objectLength; i++) {
-            objectBytes[i] = input.readByte();
-        }
-        try {
-            this.object = Serializer.deserializeObject(objectBytes);
-        } catch (final ClassNotFoundException e) {
-            throw new IOException(e.getMessage(), e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
new file mode 100644
index 0000000..d13a207
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.spark.AccumulatorParam;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MemoryAccumulator<A> implements AccumulatorParam<A> {
+
+    private final MemoryComputeKey<A> memoryComputeKey;
+
+    public MemoryAccumulator(final MemoryComputeKey<A> memoryComputeKey) {
+        this.memoryComputeKey = memoryComputeKey;
+    }
+
+    @Override
+    public A addAccumulator(final A a, final A b) {
+        return null == a ? b : this.memoryComputeKey.getReducer().apply(a, b);
+    }
+
+    @Override
+    public A addInPlace(final A a, final A b) {
+        return null == a ? b : this.memoryComputeKey.getReducer().apply(a, b);
+    }
+
+    @Override
+    public A zero(final A a) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
deleted file mode 100644
index bebd283..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer;
-
-import org.apache.spark.AccumulatorParam;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class RuleAccumulator implements AccumulatorParam<Rule> {
-
-    @Override
-    public Rule addAccumulator(final Rule a, final Rule b) {
-        if (a.getOperation().equals(Rule.Operation.NO_OP))
-            return b;
-        if (b.getOperation().equals(Rule.Operation.NO_OP))
-            return a;
-        else
-            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
-    }
-
-    @Override
-    public Rule addInPlace(final Rule a, final Rule b) {
-        if (a.getOperation().equals(Rule.Operation.NO_OP))
-            return b;
-        if (b.getOperation().equals(Rule.Operation.NO_OP))
-            return a;
-        else
-            return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
-    }
-
-    @Override
-    public Rule zero(final Rule rule) {
-        return new Rule(Rule.Operation.NO_OP, null);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index fd3de43..456c81b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
@@ -44,7 +45,6 @@ import scala.Tuple2;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -87,8 +87,7 @@ public final class SparkExecutor {
                 .mapPartitionsToPair(partitionIterator -> {
                     HadoopPools.initialize(apacheConfiguration);
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
-                    final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
-                    final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? EMPTY_ARRAY : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
+                    final String[] elementComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();
                     workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
                     return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b50a43ce/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 4d9280f..dee6ced 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -51,6 +51,7 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
@@ -251,7 +252,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         }
                     }
                     // write the computed graph to the respective output (rdd or output format)
-                    final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
+                    final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : VertexProgramHelper.vertexComputeKeysAsArray(this.vertexProgram.getVertexComputeKeys());
                     computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, elementComputeKeys);
                     if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
                             hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) && !this.persist.equals(Persist.NOTHING)) {