You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/23 20:25:16 UTC

incubator-tinkerpop git commit: New Reducing-based Memory-model implemented. A few kooky things emerged because of this and will discuss in UPGRADE docs. However, all-in-all this is a much nicer model which will lead to significant perofmrance improvemen

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1166 90debb4fe -> 9416e9498


New Reducing-based Memory-model implemented. A few kooky things emerged because of this and will discuss in UPGRADE docs. However, all-in-all this is a much nicer model which will lead to significant perofmrance improvements (still need to benchmark and test). I don't think we will ever deprecate the MapReduce infrastructure. The Memory model is not as flexible and efficient (for certain jobs) as MapReduce.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/9416e949
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/9416e949
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/9416e949

Branch: refs/heads/TINKERPOP-1166
Commit: 9416e9498a363e8ca1ef69df6cc0d046762e43e3
Parents: 90debb4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Feb 23 10:25:36 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Feb 23 10:25:36 2016 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   |  13 +-
 .../traversal/TraversalVertexProgram.java       |   3 +
 .../traversal/step/map/CountGlobalStep.java     |   2 +-
 .../process/traversal/step/map/FoldStep.java    |   4 +-
 .../traversal/step/map/GroupCountStep.java      |   7 +-
 .../process/traversal/step/map/GroupStep.java   |  11 +-
 .../traversal/step/map/GroupStepV3d0.java       | 182 ++++---------------
 .../traversal/step/map/MaxGlobalStep.java       |   2 +-
 .../traversal/step/map/MeanGlobalStep.java      |  12 +-
 .../traversal/step/map/MinGlobalStep.java       |   2 +-
 .../traversal/step/map/SumGlobalStep.java       |   2 +-
 .../process/traversal/step/map/TreeStep.java    |   2 +-
 .../gremlin/structure/io/gryo/GryoMapper.java   |  26 ++-
 .../hadoop/structure/io/ObjectWritable.java     |   4 +-
 .../process/computer/MemoryAccumulator.java     |  23 +--
 .../spark/process/computer/SparkMemory.java     |  28 ++-
 .../io/PersistedInputOutputRDDTest.java         |   8 +-
 17 files changed, 122 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index f3c1624..5055903 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -196,15 +196,16 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                 }
                 // execute the job and wait until it completes (if it fails, throw an exception)
                 if (!job.run(true))
-                    throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs");  // how do I get the exception that occured?
+                    throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs: " + job.getInternalJob().getStatus().getFailureInfo());
                 // add vertex program memory values to the return memory
-                for (final MemoryComputeKey memoryKey : this.vertexProgram.getMemoryComputeKeys()) {
-                    if (!memoryKey.isTransient() && storage.exists(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey()))) {
-                        final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey())));
+                for (final MemoryComputeKey memoryComputeKey : this.vertexProgram.getMemoryComputeKeys()) {
+                    if (!memoryComputeKey.isTransient() && storage.exists(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryComputeKey.getKey()))) {
+                        final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryComputeKey.getKey())));
                         if (iterator.hasNext()) {
-                            this.memory.set(memoryKey.getKey(), iterator.next().getValue());
+                            this.memory.set(memoryComputeKey.getKey(), iterator.next().getValue());
                         }
-                        storage.rm(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey.getKey()));
+                        // vertex program memory items are not stored on disk
+                        storage.rm(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryComputeKey.getKey()));
                     }
                 }
                 final Path path = new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), Constants.HIDDEN_ITERATION));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 92ea9eb..1d29f86 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStepV3d0;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
@@ -189,6 +190,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 if (memory.exists(ReducingBarrierStep.REDUCING)) {
                     if (reducingBarrierStep instanceof GroupStep)
                         memory.set(ReducingBarrierStep.REDUCING, ((GroupStep) reducingBarrierStep).getReducedMap(memory.get(ReducingBarrierStep.REDUCING)));
+                    else if(reducingBarrierStep instanceof GroupStepV3d0)
+                        memory.set(ReducingBarrierStep.REDUCING, ((GroupStepV3d0) reducingBarrierStep).getReducedMap(memory.get(ReducingBarrierStep.REDUCING)));
                     else
                         memory.set(ReducingBarrierStep.REDUCING, FinalGet.tryFinalGet(memory.get(ReducingBarrierStep.REDUCING)));
                 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
index 4d72146..2c7cf0c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
@@ -55,7 +55,7 @@ public final class CountGlobalStep<S> extends ReducingBarrierStep<S, Long> {
 
     ///////////
 
-    private static class CountBiOperator implements BinaryOperator<Long>, Serializable {
+    public static class CountBiOperator implements BinaryOperator<Long>, Serializable {
 
         @Override
         public Long apply(final Long mutatingSeed, final Long count) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
index d9dec06..130f53a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
@@ -72,7 +72,7 @@ public final class FoldStep<S, E> extends ReducingBarrierStep<S, E> {
 
     /////////
 
-    private static class ListBiOperator<S> implements BinaryOperator<List<S>>, Serializable {
+    public static class ListBiOperator<S> implements BinaryOperator<List<S>>, Serializable {
         @Override
         public List<S> apply(final List<S> mutatingSeed, final List<S> list) {
             mutatingSeed.addAll(list);
@@ -80,7 +80,7 @@ public final class FoldStep<S, E> extends ReducingBarrierStep<S, E> {
         }
     }
 
-    private static class FoldBiOperator<E> implements BinaryOperator<E>, Serializable {
+    public static class FoldBiOperator<E> implements BinaryOperator<E>, Serializable {
 
         private final BiFunction biFunction;
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
index 966d17a..8180950 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
@@ -31,7 +31,6 @@ import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -62,9 +61,7 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
     }
 
     public Map<E, Long> projectTraverser(final Traverser.Admin<S> traverser) {
-        final Map<E, Long> map = new HashMap<>(); // TODO: make singleton map
-        map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), traverser.bulk());
-        return map;
+        return Collections.singletonMap(TraversalUtil.applyNullable(traverser, this.keyTraversal), traverser.bulk());
     }
 
     @Override
@@ -96,7 +93,7 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
 
     ///////////
 
-    private static class GroupCountBiOperator<E> implements BinaryOperator<Map<E, Long>>, Serializable {
+    public static final class GroupCountBiOperator<E> implements BinaryOperator<Map<E, Long>>, Serializable {
 
         @Override
         public Map<E, Long> apply(final Map<E, Long> mutatingSeed, final Map<E, Long> map) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index 00d9a8d..ba4a840 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -79,9 +80,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
         } else
             traverserSet.add(traverser);
 
-        final Map<K, V> map = new HashMap<>();
-        map.put(key, (V) traverserSet);
-        return map;
+        return Collections.singletonMap(key, (V) traverserSet);
     }
 
     @Override
@@ -152,7 +151,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
 
     ///////////
 
-    private static class GroupComputerBiOperator<S, K, V> implements BinaryOperator<Map<K, V>>, Serializable {
+    public static final class GroupComputerBiOperator<S, K, V> implements BinaryOperator<Map<K, V>>, Serializable {
 
         private GroupComputerBiOperator() {
         }
@@ -171,11 +170,11 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
         }
     }
 
-    private static class GroupStandardBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable {
+    public static final class GroupStandardBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable {
 
         private final GroupStep groupStep;
 
-        private GroupStandardBiOperator(final GroupStep groupStep) {
+        public GroupStandardBiOperator(final GroupStep groupStep) {
             this.groupStep = groupStep;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
index a2b7f5e..b72dce5 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
@@ -19,67 +19,62 @@
 
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
 import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 @Deprecated
-public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<K, R>> implements MapReducer, ByModulating, GraphComputing, TraversalParent {
+public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<K, R>> implements ByModulating, GraphComputing, TraversalParent {
 
     private char state = 'k';
 
     private Traversal.Admin<S, K> keyTraversal = null;
     private Traversal.Admin<S, V> valueTraversal = null;
     private Traversal.Admin<Collection<V>, R> reduceTraversal = null;
-    private boolean byPass = false;
 
     public GroupStepV3d0(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier((Supplier) new GroupMapSupplierV3d0());
-       // this.setBiFunction(new GroupBiFunction(this));
+        this.setReducingBiOperator(new GroupBiOperatorV3d0<>());
     }
 
     @Override
-    public Map<K, R> projectTraverser(Traverser.Admin<S> traverser) {
-        return null;
+    public Map<K, R> projectTraverser(final Traverser.Admin<S> traverser) {
+        final K key = TraversalUtil.applyNullable(traverser, this.keyTraversal);
+        final BulkSet<V> values = new BulkSet<>();
+        final V value = TraversalUtil.applyNullable(traverser, this.valueTraversal);
+        TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
+        return Collections.singletonMap(key, (R) values);
     }
 
     @Override
     public void onGraphComputer() {
-        this.byPass = true;
+        this.setSeedSupplier((Supplier) HashMapSupplier.instance());
+        this.setReducingBiOperator(new GroupBiOperatorV3d0<>());
     }
 
     @Override
@@ -94,10 +89,6 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
         return children;
     }
 
-    public Traversal.Admin<Collection<V>, R> getReduceTraversal() {
-        return this.reduceTraversal;
-    }
-
     @Override
     public void modulateBy(final Traversal.Admin<?, ?> kvrTraversal) {
         if ('k' == this.state) {
@@ -128,7 +119,6 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
             clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
         if (null != this.reduceTraversal)
             clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
-       // clone.setBiFunction(new GroupBiFunction<>((GroupStepV3d0) clone));
         return clone;
     }
 
@@ -142,63 +132,49 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
     }
 
     @Override
-    public MapReduce<K, Collection<V>, K, R, Map<K, R>> getMapReduce() {
-        return new GroupMapReduceV3d0<>(this);
+    public String toString() {
+        return StringFactory.stepString(this, this.keyTraversal, this.valueTraversal, this.reduceTraversal);
     }
 
-    @Override
-    public Traverser<Map<K, R>> processNextStart() {
-        if (this.byPass) {
-            final Traverser.Admin<S> traverser = this.starts.next();
-            final Object[] kvPair = new Object[]{TraversalUtil.applyNullable(traverser, (Traversal.Admin<S, Map>) this.keyTraversal), TraversalUtil.applyNullable(traverser, (Traversal.Admin<S, Map>) this.valueTraversal)};
-            return traverser.asAdmin().split(kvPair, (Step) this);
-        } else {
-            return super.processNextStart();
+    public Map<K, R> getReducedMap(final Map<K, Collection<V>> valueMap) {
+        final Map<K, R> reducedMap = new HashMap<>();
+        for (final K key : valueMap.keySet()) {
+            final R r = TraversalUtil.applyNullable(valueMap.get(key), this.reduceTraversal);
+            reducedMap.put(key, r);
         }
+        return reducedMap;
     }
 
-    @Override
-    public String toString() {
-        return StringFactory.stepString(this, this.keyTraversal, this.valueTraversal, this.reduceTraversal);
-    }
+    ////////////
 
-    ///////////
+    public static class GroupBiOperatorV3d0<K, V> implements BinaryOperator<Map<K, V>>, Serializable {
 
-    private static class GroupBiFunction<S, K, V> implements BiFunction<Map<K, Collection<V>>, Traverser.Admin<S>, Map<K, Collection<V>>>, Serializable {
+        private GroupBiOperatorV3d0() {
 
-        private final GroupStepV3d0<S, K, V, ?> groupStep;
-
-        private GroupBiFunction(final GroupStepV3d0<S, K, V, ?> groupStep) {
-            this.groupStep = groupStep;
         }
 
         @Override
-        public Map<K, Collection<V>> apply(final Map<K, Collection<V>> mutatingSeed, final Traverser.Admin<S> traverser) {
-            final K key = TraversalUtil.applyNullable(traverser, this.groupStep.keyTraversal);
-            final V value = TraversalUtil.applyNullable(traverser, this.groupStep.valueTraversal);
-            Collection<V> values = mutatingSeed.get(key);
-            if (null == values) {
-                values = new BulkSet<>();
-                mutatingSeed.put(key, values);
+        public Map<K, V> apply(final Map<K, V> mutatingSeed, final Map<K, V> map) {
+            for (final K key : map.keySet()) {
+                final BulkSet<V> values = (BulkSet<V>) map.get(key);
+                BulkSet<V> seedValues = (BulkSet<V>) mutatingSeed.get(key);
+                if (null == seedValues) {
+                    seedValues = new BulkSet<>();
+                    mutatingSeed.put(key, (V) seedValues);
+                }
+                seedValues.addAll(values);
             }
-            TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
             return mutatingSeed;
         }
     }
 
-    //////////
+    ////////
 
     private class GroupMapV3d0 extends HashMap<K, Collection<V>> implements FinalGet<Map<K, R>> {
 
         @Override
         public Map<K, R> getFinal() {
-            if (null == GroupStepV3d0.this.reduceTraversal)
-                return (Map<K, R>) this;
-            else {
-                final Map<K, R> reduceMap = new HashMap<>();
-                this.forEach((k, vv) -> reduceMap.put(k, TraversalUtil.applyNullable(vv, GroupStepV3d0.this.reduceTraversal)));
-                return reduceMap;
-            }
+            return GroupStepV3d0.this.getReducedMap(this);
         }
     }
 
@@ -212,92 +188,4 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
             return new GroupMapV3d0();
         }
     }
-
-    ///////////
-
-    public static final class GroupMapReduceV3d0<K, V, R> implements MapReduce<K, Collection<V>, K, R, Map<K, R>> {
-
-        public static final String GROUP_BY_STEP_STEP_ID = "gremlin.groupStep.stepId";
-
-        private String groupStepId;
-        private Traversal.Admin<Collection<V>, R> reduceTraversal;
-
-        private GroupMapReduceV3d0() {
-
-        }
-
-        public GroupMapReduceV3d0(final GroupStepV3d0<?, K, V, R> step) {
-            this.groupStepId = step.getId();
-            this.reduceTraversal = step.getReduceTraversal();
-        }
-
-        @Override
-        public void storeState(final Configuration configuration) {
-            MapReduce.super.storeState(configuration);
-            configuration.setProperty(GROUP_BY_STEP_STEP_ID, this.groupStepId);
-        }
-
-        @Override
-        public void loadState(final Graph graph, final Configuration configuration) {
-            this.groupStepId = configuration.getString(GROUP_BY_STEP_STEP_ID);
-            this.reduceTraversal = ((GroupStepV3d0) new TraversalMatrix<>(TraversalVertexProgram.getTraversal(graph, configuration)).getStepById(this.groupStepId)).getReduceTraversal();
-        }
-
-        @Override
-        public boolean doStage(final Stage stage) {
-            return !stage.equals(Stage.COMBINE);
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<K, Collection<V>> emitter) {
-            vertex.<TraverserSet<Object[]>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(traverser -> {
-                final Object[] objects = traverser.get();
-                for (int i = 0; i < traverser.bulk(); i++) {
-                    if (objects[1] instanceof Collection)
-                        emitter.emit((K) objects[0], (Collection<V>) objects[1]);
-                    else {
-                        final List<V> collection = new ArrayList<>();
-                        collection.add((V) objects[1]);
-                        emitter.emit((K) objects[0], collection);
-                    }
-                }
-            }));
-        }
-
-        @Override
-        public void reduce(final K key, final Iterator<Collection<V>> values, final ReduceEmitter<K, R> emitter) {
-            final Set<V> set = new BulkSet<>();
-            values.forEachRemaining(set::addAll);
-            emitter.emit(key, TraversalUtil.applyNullable(set, this.reduceTraversal));
-        }
-
-        @Override
-        public Map<K, R> generateFinalResult(final Iterator<KeyValue<K, R>> keyValues) {
-            final Map<K, R> map = new HashMap<>();
-            keyValues.forEachRemaining(keyValue -> map.put(keyValue.getKey(), keyValue.getValue()));
-            return map;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public GroupMapReduceV3d0<K, V, R> clone() {
-            try {
-                final GroupMapReduceV3d0<K, V, R> clone = (GroupMapReduceV3d0<K, V, R>) super.clone();
-                if (null != clone.reduceTraversal)
-                    clone.reduceTraversal = this.reduceTraversal.clone();
-                return clone;
-            } catch (final CloneNotSupportedException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return StringFactory.mapReduceString(this, this.getMemoryKey());
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
index 7db28e5..c9466db 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
@@ -57,7 +57,7 @@ public final class MaxGlobalStep<S extends Number> extends ReducingBarrierStep<S
 
     /////
 
-    private static class MaxGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
+    public static class MaxGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
         @Override
         public S apply(final S mutatingSeed, final S number) {
             return !NAN.equals(mutatingSeed) ? (S) max(mutatingSeed, number) : number;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
index 53688cd..1f8c99e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
@@ -61,11 +61,19 @@ public final class MeanGlobalStep<S extends Number, E extends Number> extends Re
 
     /////
 
-    private static class MeanGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
+    public static final class MeanGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
 
         @Override
         public S apply(final S mutatingSeed, final S number) {
-            return (S) (number instanceof MeanNumber ? ((MeanNumber) mutatingSeed).add((MeanNumber) number) : ((MeanNumber) mutatingSeed).add(number, 1l));
+            if (mutatingSeed instanceof MeanNumber) {
+                return (number instanceof MeanNumber) ?
+                        (S) ((MeanNumber) mutatingSeed).add((MeanNumber) number) :
+                        (S) ((MeanNumber) mutatingSeed).add(number, 1l);
+            } else {
+                return (number instanceof MeanNumber) ?
+                        (S) ((MeanNumber) number).add(mutatingSeed, 1l) :
+                        (S) new MeanNumber(number, 1l).add(mutatingSeed, 1l);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
index a1bc2c4..bdcad4c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
@@ -56,7 +56,7 @@ public final class MinGlobalStep<S extends Number> extends ReducingBarrierStep<S
 
     /////
 
-    private static class MinGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
+    public static class MinGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
         @Override
         public S apply(final S mutatingSeed, final S number) {
             return !NAN.equals(mutatingSeed) ? (S) min(mutatingSeed, number) : number;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
index bea7717..2311344 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
@@ -61,7 +61,7 @@ public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S
 
     /////
 
-    private static class SumGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
+    public static class SumGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
 
         @Override
         public S apply(final S mutatingSeed, final S number) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
index fa41210..3aeecbf 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
@@ -109,7 +109,7 @@ public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements T
 
     ///////////
 
-    private static class TreeBiOperator implements BinaryOperator<Tree>, Serializable {
+    public static final class TreeBiOperator implements BinaryOperator<Tree>, Serializable {
 
         @Override
         public Tree apply(final Tree mutatingSeed, final Tree tree) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index 94fd7d6..c8fceb7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -23,6 +23,16 @@ import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.process.traversal.Contains;
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupCountStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStepV3d0;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.MinGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.SumGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.TreeStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_P_S_SE_SL_Traverser;
@@ -317,7 +327,21 @@ public final class GryoMapper implements Mapper<Kryo> {
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.OrOperator.class, null, 108));
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SetOperator.class, null, 109));
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SumLongOperator.class, null, 110));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SumIntegerOperator.class, null, 111)); // ***LAST ID**
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MemoryComputeKey.SumIntegerOperator.class, null, 111));
+
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(CountGlobalStep.CountBiOperator.class, null, 112));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(FoldStep.ListBiOperator.class, null, 113));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(FoldStep.FoldBiOperator.class, null, 114));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupCountStep.GroupCountBiOperator.class, null, 115));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupStep.GroupComputerBiOperator.class, null, 116));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupStep.GroupStandardBiOperator.class, null, 117));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MaxGlobalStep.MaxGlobalBiOperator.class, null, 118));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MeanGlobalStep.MeanGlobalBiOperator.class, null, 119));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MeanGlobalStep.MeanNumber.class, null, 120));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(MinGlobalStep.MinGlobalBiOperator.class, null, 121));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(SumGlobalStep.SumGlobalBiOperator.class, null, 122));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TreeStep.TreeBiOperator.class, null, 123));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupStepV3d0.GroupBiOperatorV3d0.class, null, 124)); // ***LAST ID**
         }};
 
         private final List<IoRegistry> registries = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
index 5c45400..a6b11ca 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java
@@ -68,7 +68,7 @@ public final class ObjectWritable<T> implements WritableComparable<ObjectWritabl
                 // the type is embedded in the stream so it can just read it from there and return it as needed.
                 // presumably that will cast nicely to T
                 return (T) gryoReader.readObject(new ByteArrayInputStream(WritableUtils.readCompressedByteArray(input)), Object.class);
-            } catch (IOException e) {
+            } catch (final IOException e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }
         });
@@ -81,7 +81,7 @@ public final class ObjectWritable<T> implements WritableComparable<ObjectWritabl
                 final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                 gryoWriter.writeObject(outputStream, this.t);
                 WritableUtils.writeCompressedByteArray(output, outputStream.toByteArray());
-            } catch (IOException e) {
+            } catch (final IOException e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }
         });

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
index 8e88b9f..cf8cb25 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
@@ -20,12 +20,13 @@
 package org.apache.tinkerpop.gremlin.spark.process.computer;
 
 import org.apache.spark.AccumulatorParam;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MemoryAccumulator<A> implements AccumulatorParam<A> {
+public final class MemoryAccumulator<A> implements AccumulatorParam<ObjectWritable<A>> {
 
     private final MemoryComputeKey<A> memoryComputeKey;
 
@@ -34,25 +35,21 @@ public final class MemoryAccumulator<A> implements AccumulatorParam<A> {
     }
 
     @Override
-    public A addAccumulator(final A a, final A b) {
-        if (null == a)
+    public ObjectWritable<A> addAccumulator(final ObjectWritable<A> a, final ObjectWritable<A> b) {
+        if (a.isEmpty())
             return b;
-        if (null == b)
+        if (b.isEmpty())
             return a;
-        return this.memoryComputeKey.getReducer().apply(a, b);
+        return new ObjectWritable<>(this.memoryComputeKey.getReducer().apply(a.get(), b.get()));
     }
 
     @Override
-    public A addInPlace(final A a, final A b) {
-        if (null == a)
-            return b;
-        if (null == b)
-            return a;
-        return this.memoryComputeKey.getReducer().apply(a, b);
+    public ObjectWritable<A> addInPlace(final ObjectWritable<A> a, final ObjectWritable<A> b) {
+        return this.addAccumulator(a, b);
     }
 
     @Override
-    public A zero(final A a) {
-        return null;
+    public ObjectWritable<A> zero(final ObjectWritable<A> a) {
+        return ObjectWritable.empty();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
index 8262513..cb672e1 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
@@ -46,22 +47,23 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     public final Map<String, MemoryComputeKey> memoryKeys = new HashMap<>();
     private final AtomicInteger iteration = new AtomicInteger(0);   // do these need to be atomics?
     private final AtomicLong runtime = new AtomicLong(0l);
-    private final Map<String, Accumulator> memory = new HashMap<>();
+    private final Map<String, Accumulator<ObjectWritable>> memory = new HashMap<>();
     private Broadcast<Map<String, Object>> broadcast;
     private boolean inTask = false;
 
     public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
         if (null != vertexProgram) {
             for (final MemoryComputeKey key : vertexProgram.getMemoryComputeKeys()) {
-                MemoryHelper.validateKey(key.getKey());
                 this.memoryKeys.put(key.getKey(), key);
             }
         }
         for (final MapReduce mapReduce : mapReducers) {
             this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false));
         }
-        for (final MemoryComputeKey key : this.memoryKeys.values()) {
-            this.memory.put(key.getKey(), sparkContext.accumulator(null, key.getKey(), new MemoryAccumulator<>(key)));
+        for (final MemoryComputeKey memoryComputeKey : this.memoryKeys.values()) {
+            this.memory.put(
+                    memoryComputeKey.getKey(),
+                    sparkContext.accumulator(ObjectWritable.empty(), memoryComputeKey.getKey(), new MemoryAccumulator<>(memoryComputeKey)));
         }
         this.broadcast = sparkContext.broadcast(new HashMap<>());
     }
@@ -73,7 +75,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         else {
             final Set<String> trueKeys = new HashSet<>();
             this.memory.forEach((key, value) -> {
-                if (value.value() != null)
+                if (!value.value().isEmpty())
                     trueKeys.add(key);
             });
             return Collections.unmodifiableSet(trueKeys);
@@ -107,18 +109,18 @@ public final class SparkMemory implements Memory.Admin, Serializable {
 
     @Override
     public <R> R get(final String key) throws IllegalArgumentException {
-        final R r = this.getValue(key);
-        if (null == r)
+        final ObjectWritable<R> r = (ObjectWritable<R>) (this.inTask ? this.broadcast.value().get(key) : this.memory.get(key).value());
+        if (r.isEmpty())
             throw Memory.Exceptions.memoryDoesNotExist(key);
         else
-            return r;
+            return r.get();
     }
 
     @Override
     public void add(final String key, final Object value) {
         checkKeyValue(key, value);
         if (this.inTask)
-            this.memory.get(key).add(value);
+            this.memory.get(key).add(new ObjectWritable<>(value));
         else
             throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key);
     }
@@ -129,7 +131,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         if (this.inTask)
             throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
         else
-            this.memory.get(key).setValue(value);
+            this.memory.get(key).setValue(new ObjectWritable<>(value));
     }
 
     @Override
@@ -149,7 +151,7 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         this.broadcast.destroy(true); // do we need to block?
         final Map<String, Object> toBroadcast = new HashMap<>();
         this.memory.forEach((key, object) -> {
-            if (null != object.value())
+            if (!object.value().isEmpty())
                 toBroadcast.put(key, object.value());
         });
         this.broadcast = sparkContext.broadcast(toBroadcast);
@@ -160,8 +162,4 @@ public final class SparkMemory implements Memory.Admin, Serializable {
             throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
         MemoryHelper.validateValue(value);
     }
-
-    private <R> R getValue(final String key) {
-        return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9416e949/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 040dd39..dd2dec7 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -139,7 +139,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
 
     @Test
     public void shouldPersistRDDAcrossJobs() throws Exception {
-
+        Spark.create("local[4]");
         final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
         final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString());
         final Configuration configuration = super.getBaseConfiguration();
@@ -157,8 +157,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
                                 "gremlin-groovy",
                                 "g.V().count()").create(graph)).submit().get();
         assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
-        assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("reducing"))));
-        assertEquals(2, Spark.getContext().getPersistentRDDs().size());
+        assertEquals(1, Spark.getContext().getPersistentRDDs().size());
         ///////
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
@@ -167,8 +166,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         graph = GraphFactory.open(configuration);
         assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue());
         assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
-        assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName, Graph.Hidden.hide("reducing"))));
-        assertEquals(2, Spark.getContext().getPersistentRDDs().size());
+        assertEquals(1, Spark.getContext().getPersistentRDDs().size());
         Spark.close();
     }