You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/24 00:21:11 UTC

incubator-tinkerpop git commit: The next big thing in the new MemoryComputeKey model -- broadcasting. It is possible to state that a MemoryKey will NOT be read by workers and thus, no need to send the data to the workers on each iteration. Added GraphCom

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1166 d77bf58c8 -> 14d0d4843


The next big thing in the new MemoryComputeKey model -- broadcasting. It is possible to state that a MemoryKey will NOT be read by workers and thus, no need to send the data to the workers on each iteration. Added GraphComputerTest.shouldSupportBroadcastKeys(). SparkGraphComputer supports this natively, TinkerGraphComputer simply hides the data when trying to be accessed by workers, and GiraphGraphComputer (like TinkerGraphComputer) but I will be able to at least clear the data immediately once its sent (future work). Cleaned up GroupStep a bit. Have a consistent naming convention for workers vs. master --- inExecute. All XXXMemory implementations use it so its easier to see how they all relate to each other. Added a GraphComputerTest to make sure exceptions are correct around get(), add(), set() for various situations -- found a couple of inconsistencies that are now fixed up.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/14d0d484
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/14d0d484
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/14d0d484

Branch: refs/heads/TINKERPOP-1166
Commit: 14d0d48435640572523d942d56b9a7a6dd128969
Parents: d77bf58
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Feb 23 16:21:02 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Feb 23 16:21:02 2016 -0700

----------------------------------------------------------------------
 .../giraph/process/computer/GiraphMemory.java   |  54 +++---
 .../process/computer/MemoryComputeKey.java      |  12 +-
 .../peerpressure/PeerPressureVertexProgram.java |   2 +-
 .../traversal/TraversalVertexProgram.java       |  10 +-
 .../process/computer/util/MapMemory.java        |   3 +-
 .../process/traversal/step/map/GroupStep.java   |  13 +-
 .../traversal/step/map/GroupStepV3d0.java       |   9 -
 .../step/util/ReducingBarrierStep.java          |   1 -
 .../process/computer/GraphComputerTest.java     | 191 ++++++++++++++++---
 .../process/computer/SparkGraphComputer.java    |   4 +-
 .../spark/process/computer/SparkMemory.java     |  50 ++---
 .../process/computer/TinkerMemory.java          |   7 +-
 12 files changed, 257 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
index bc7cc04..5e87f20 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphMemory.java
@@ -51,8 +51,8 @@ public final class GiraphMemory extends MasterCompute implements Memory {
 
     private VertexProgram<?> vertexProgram;
     private GiraphWorkerContext worker;
-    private Map<String, MemoryComputeKey> memoryKeys;
-    private boolean isMasterCompute = true;
+    private Map<String, MemoryComputeKey> memoryComputeKeys;
+    private boolean inExecute = false;
     private long startTime = System.currentTimeMillis();
 
     public GiraphMemory() {
@@ -62,9 +62,9 @@ public final class GiraphMemory extends MasterCompute implements Memory {
     public GiraphMemory(final GiraphWorkerContext worker, final VertexProgram<?> vertexProgram) {
         this.worker = worker;
         this.vertexProgram = vertexProgram;
-        this.memoryKeys = new HashMap<>();
-        this.vertexProgram.getMemoryComputeKeys().forEach(key -> this.memoryKeys.put(key.getKey(), key));
-        this.isMasterCompute = false;
+        this.memoryComputeKeys = new HashMap<>();
+        this.vertexProgram.getMemoryComputeKeys().forEach(key -> this.memoryComputeKeys.put(key.getKey(), key));
+        this.inExecute = true;
     }
 
 
@@ -76,14 +76,14 @@ public final class GiraphMemory extends MasterCompute implements Memory {
 
     @Override
     public void compute() {
-        this.isMasterCompute = true;
+        this.inExecute = false;
         if (0 == this.getSuperstep()) { // setup
             final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getConf());
             this.vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
-            this.memoryKeys = new HashMap<>();
-            this.vertexProgram.getMemoryComputeKeys().forEach(key -> this.memoryKeys.put(key.getKey(), key));
+            this.memoryComputeKeys = new HashMap<>();
+            this.vertexProgram.getMemoryComputeKeys().forEach(key -> this.memoryComputeKeys.put(key.getKey(), key));
             try {
-                for (final MemoryComputeKey key : this.memoryKeys.values()) {
+                for (final MemoryComputeKey key : this.memoryComputeKeys.values()) {
                     this.registerPersistentAggregator(key.getKey(), MemoryAggregator.class);
                 }
             } catch (final Exception e) {
@@ -96,11 +96,12 @@ public final class GiraphMemory extends MasterCompute implements Memory {
                 final MapMemory memory = new MapMemory(this);
                 // a hack to get the last iteration memory values to stick
                 this.vertexProgram.terminate(memory);
+
                 final String outputLocation = this.getConf().get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
                 if (null != outputLocation) {
                     try {
                         for (final String key : this.keys()) {
-                            if (!this.memoryKeys.get(key).isTransient()) { // do not write transient memory keys to disk
+                            if (!this.memoryComputeKeys.get(key).isTransient()) { // do not write transient memory keys to disk
                                 final SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(this.getConf()), this.getConf(), new Path(outputLocation + "/" + key), ObjectWritable.class, ObjectWritable.class);
                                 writer.append(ObjectWritable.getNullObjectWritable(), new ObjectWritable<>(memory.get(key)));
                                 writer.close();
@@ -121,11 +122,11 @@ public final class GiraphMemory extends MasterCompute implements Memory {
 
     @Override
     public int getIteration() {
-        if (this.isMasterCompute) {
+        if (this.inExecute) {
+            return (int) this.worker.getSuperstep();
+        } else {
             final int temp = (int) this.getSuperstep();
             return temp == 0 ? temp : temp - 1;
-        } else {
-            return (int) this.worker.getSuperstep();
         }
     }
 
@@ -136,21 +137,26 @@ public final class GiraphMemory extends MasterCompute implements Memory {
 
     @Override
     public Set<String> keys() {
-        return this.memoryKeys.values().stream().filter(key -> this.exists(key.getKey())).map(MemoryComputeKey::getKey).collect(Collectors.toSet());
+        return this.memoryComputeKeys.values().stream().filter(key -> this.exists(key.getKey())).map(MemoryComputeKey::getKey).collect(Collectors.toSet());
     }
 
     @Override
     public boolean exists(final String key) {
-        final ObjectWritable value = this.isMasterCompute ? this.getAggregatedValue(key) : this.worker.getAggregatedValue(key);
+        if (this.inExecute && this.memoryComputeKeys.containsKey(key) && !this.memoryComputeKeys.get(key).isBroadcast())
+            return false;
+        final ObjectWritable value = this.inExecute ? this.worker.getAggregatedValue(key) : this.getAggregatedValue(key);
         return null != value && !value.isEmpty();
     }
 
     @Override
     public <R> R get(final String key) throws IllegalArgumentException {
-        //this.checkKey(key);
-        final ObjectWritable<Pair<BinaryOperator, Object>> value = this.isMasterCompute ?
-                this.<ObjectWritable<Pair<BinaryOperator, Object>>>getAggregatedValue(key) :
-                this.worker.<ObjectWritable<Pair<BinaryOperator, Object>>>getAggregatedValue(key);
+        if (!this.memoryComputeKeys.containsKey(key))
+            throw Memory.Exceptions.memoryDoesNotExist(key);
+        if (this.inExecute && !this.memoryComputeKeys.get(key).isBroadcast())
+            throw Memory.Exceptions.memoryDoesNotExist(key);
+        final ObjectWritable<Pair<BinaryOperator, Object>> value = this.inExecute ?
+                this.worker.<ObjectWritable<Pair<BinaryOperator, Object>>>getAggregatedValue(key) :
+                this.<ObjectWritable<Pair<BinaryOperator, Object>>>getAggregatedValue(key);
         if (null == value || value.isEmpty())
             throw Memory.Exceptions.memoryDoesNotExist(key);
         else
@@ -160,17 +166,17 @@ public final class GiraphMemory extends MasterCompute implements Memory {
     @Override
     public void set(final String key, final Object value) {
         this.checkKeyValue(key, value);
-        if (!this.isMasterCompute)   // only called on setup() and terminate()
+        if (this.inExecute)
             throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
-        this.setAggregatedValue(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
+        this.setAggregatedValue(key, new ObjectWritable<>(new Pair<>(this.memoryComputeKeys.get(key).getReducer(), value)));
     }
 
     @Override
     public void add(final String key, final Object value) {
         this.checkKeyValue(key, value);
-        if (this.isMasterCompute)
+        if (!this.inExecute)
             throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key);
-        this.worker.aggregate(key, new ObjectWritable<>(new Pair<>(this.memoryKeys.get(key).getReducer(), value)));
+        this.worker.aggregate(key, new ObjectWritable<>(new Pair<>(this.memoryComputeKeys.get(key).getReducer(), value)));
     }
 
     @Override
@@ -189,7 +195,7 @@ public final class GiraphMemory extends MasterCompute implements Memory {
     }
 
     private void checkKeyValue(final String key, final Object value) {
-        if (!this.memoryKeys.containsKey(key))
+        if (!this.memoryComputeKeys.containsKey(key))
             throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
         MemoryHelper.validateValue(value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
index 71499f0..d31164c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
@@ -32,11 +32,13 @@ public final class MemoryComputeKey<A> implements Serializable {
     private final String key;
     private final BinaryOperator<A> reducer;
     private final boolean isTransient;
+    private final boolean isBroadcast;
 
-    private MemoryComputeKey(final String key, final BinaryOperator<A> reducer, final boolean isTransient) {
+    private MemoryComputeKey(final String key, final BinaryOperator<A> reducer, final boolean isBroadcast, final boolean isTransient) {
         this.key = key;
         this.reducer = reducer;
         this.isTransient = isTransient;
+        this.isBroadcast = isBroadcast;
         MemoryHelper.validateKey(key);
     }
 
@@ -48,6 +50,10 @@ public final class MemoryComputeKey<A> implements Serializable {
         return this.isTransient;
     }
 
+    public boolean isBroadcast() {
+        return this.isBroadcast;
+    }
+
     public BinaryOperator<A> getReducer() {
         return this.reducer;
     }
@@ -62,8 +68,8 @@ public final class MemoryComputeKey<A> implements Serializable {
         return object instanceof MemoryComputeKey && ((MemoryComputeKey) object).key.equals(this.key);
     }
 
-    public static <A> MemoryComputeKey of(final String key, final BinaryOperator<A> reducer, final boolean isTransient) {
-        return new MemoryComputeKey<>(key, reducer, isTransient);
+    public static <A> MemoryComputeKey of(final String key, final BinaryOperator<A> reducer, final boolean isBroadcast, final boolean isTransient) {
+        return new MemoryComputeKey<>(key, reducer, isBroadcast, isTransient);
     }
 
     public static SetOperator setOperator() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
index c702aef..2094660 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
@@ -72,7 +72,7 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
     private boolean distributeVote = false;
     private String property = CLUSTER;
 
-    private static final Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = Collections.singleton(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true));
+    private static final Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = Collections.singleton(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), false, true));
 
     private PeerPressureVertexProgram() {
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 777220f..6580600 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -106,22 +106,22 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public void loadState(final Graph graph, final Configuration configuration) {
         if (!configuration.containsKey(TRAVERSAL))
-            throw new IllegalArgumentException("The configuration does not have a traversal supplier: " + TRAVERSAL);
+            throw new IllegalArgumentException("The configuration does not have a traversal: " + TRAVERSAL);
         this.traversal = PureTraversal.loadState(configuration, TRAVERSAL, graph);
         if (!this.traversal.get().isLocked())
             this.traversal.get().applyStrategies();
         this.traversalMatrix = new TraversalMatrix<>(this.traversal.get());
-        this.memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true));
+        this.memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), false, true));
         for (final MapReducer<?, ?, ?, ?, ?> mapReducer : TraversalHelper.getStepsOfAssignableClassRecursively(MapReducer.class, this.traversal.get())) {
             this.mapReducers.add(mapReducer.getMapReduce());
-            this.memoryComputeKeys.add(MemoryComputeKey.of(mapReducer.getMapReduce().getMemoryKey(), MemoryComputeKey.setOperator(), false));
+            this.memoryComputeKeys.add(MemoryComputeKey.of(mapReducer.getMapReduce().getMemoryKey(), MemoryComputeKey.setOperator(), false, false));
         }
         if (!(this.traversal.get().getEndStep() instanceof SideEffectCapStep) && !(this.traversal.get().getEndStep() instanceof ReducingBarrierStep)) {
             this.mapReducers.add(new TraverserMapReduce(this.traversal.get()));
-            this.memoryComputeKeys.add(MemoryComputeKey.of(TraverserMapReduce.TRAVERSERS, MemoryComputeKey.setOperator(), false));
+            this.memoryComputeKeys.add(MemoryComputeKey.of(TraverserMapReduce.TRAVERSERS, MemoryComputeKey.setOperator(), false, false));
         }
         for (final ReducingBarrierStep<?, ?> reducingBarrierStep : TraversalHelper.getStepsOfAssignableClassRecursively(ReducingBarrierStep.class, this.traversal.get())) {
-            this.memoryComputeKeys.add(MemoryComputeKey.of(ReducingBarrierStep.REDUCING, reducingBarrierStep.getBiOperator(), false));
+            this.memoryComputeKeys.add(MemoryComputeKey.of(ReducingBarrierStep.REDUCING, reducingBarrierStep.getBiOperator(), false, false));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
index 7091062..8ba3c91 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapMemory.java
@@ -54,7 +54,7 @@ public final class MapMemory implements Memory.Admin, Serializable {
     }
 
     public void addMapReduceMemoryKey(final MapReduce mapReduce) {
-       this.memoryComputeKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(),false));
+        this.memoryComputeKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false, false));
     }
 
     @Override
@@ -73,6 +73,7 @@ public final class MapMemory implements Memory.Admin, Serializable {
 
     @Override
     public void set(final String key, Object value) {
+        // this.checkKeyValue(key, value);
         this.memoryMap.put(key, value);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index 3e34009..a6e9c66 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -22,7 +22,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.GroupStepHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
@@ -62,7 +61,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
     @Override
     public void onGraphComputer() {
         super.onGraphComputer();
-        this.setReducingBiOperator(new GroupBiOperator<>(this));
+        this.setReducingBiOperator(new GroupBiOperator<>());
     }
 
     @Override
@@ -156,16 +155,22 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
 
     public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable {
 
-        private final GroupStep groupStep;
+        private transient GroupStep groupStep;
+        private final boolean onGraphComputer;
 
         public GroupBiOperator(final GroupStep groupStep) {
             this.groupStep = groupStep;
+            this.onGraphComputer = false;
+        }
+
+        public GroupBiOperator() {
+            this.onGraphComputer = true;
         }
 
         @Override
         public Map<K, V> apply(final Map<K, V> mutatingSeed, final Map<K, V> map) {
             for (final K key : map.keySet()) {
-                if (this.groupStep.onGraphComputer) {
+                if (this.onGraphComputer) {
                     TraverserSet<?> traverserSet = (TraverserSet) mutatingSeed.get(key);
                     if (null == traverserSet) {
                         traverserSet = new TraverserSet<>();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
index b52b32b..e4a8c9d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
@@ -22,7 +22,6 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
@@ -41,7 +40,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BinaryOperator;
-import java.util.function.Supplier;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -71,13 +69,6 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
     }
 
     @Override
-    public void onGraphComputer() {
-        super.onGraphComputer();
-        this.setSeedSupplier((Supplier) HashMapSupplier.instance());
-        this.setReducingBiOperator(new GroupBiOperatorV3d0<>());
-    }
-
-    @Override
     public <A, B> List<Traversal.Admin<A, B>> getLocalChildren() {
         final List<Traversal.Admin<A, B>> children = new ArrayList<>(3);
         if (null != this.keyTraversal)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
index e40eeb2..c73f083 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
@@ -37,7 +37,6 @@ import java.util.function.Supplier;
 
 public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> implements Barrier, GraphComputing {
 
-
     public static final String REDUCING = Graph.Hidden.hide("reducing");
 
     protected Supplier<E> seedSupplier;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 2f5f71f..6e746f1 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -220,10 +220,10 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         @Override
         public Set<MemoryComputeKey> getMemoryComputeKeys() {
             return new HashSet<>(Arrays.asList(
-                    MemoryComputeKey.of("set", MemoryComputeKey.setOperator(), false),
-                    MemoryComputeKey.of("incr", MemoryComputeKey.sumLongOperator(), false),
-                    MemoryComputeKey.of("and", MemoryComputeKey.andOperator(), false),
-                    MemoryComputeKey.of("or", MemoryComputeKey.orOperator(), false)));
+                    MemoryComputeKey.of("set", MemoryComputeKey.setOperator(), true, false),
+                    MemoryComputeKey.of("incr", MemoryComputeKey.sumLongOperator(), true, false),
+                    MemoryComputeKey.of("and", MemoryComputeKey.andOperator(), true, false),
+                    MemoryComputeKey.of("or", MemoryComputeKey.orOperator(), true, false)));
         }
 
         @Override
@@ -272,7 +272,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
         @Override
         public Set<MemoryComputeKey> getMemoryComputeKeys() {
-            return Collections.singleton(MemoryComputeKey.of(null, MemoryComputeKey.orOperator(), false));
+            return Collections.singleton(MemoryComputeKey.of(null, MemoryComputeKey.orOperator(), true, false));
         }
 
         @Override
@@ -322,7 +322,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
         @Override
         public Set<MemoryComputeKey> getMemoryComputeKeys() {
-            return Collections.singleton(MemoryComputeKey.of("", MemoryComputeKey.orOperator(), false));
+            return Collections.singleton(MemoryComputeKey.of("", MemoryComputeKey.orOperator(), true, false));
         }
 
         @Override
@@ -345,7 +345,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
     ////////////////////////////////////////////
     @Test
     @LoadGraphWith(MODERN)
-    public void shouldNotAllowSettingUndeclaredMemoryKeys() throws Exception {
+    public void shouldHandleUndeclaredMemoryKeysCorrectly() throws Exception {
         graphProvider.getGraphComputer(graph).program(new VertexProgramE()).submit().get();
     }
 
@@ -353,24 +353,52 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         @Override
         public void setup(final Memory memory) {
             try {
+                memory.get("a");
+                fail("The memory key does not exist and should fail");
+            } catch (Exception e) {
+                validateException(Memory.Exceptions.memoryDoesNotExist("a"), e);
+            }
+            try {
                 memory.set("a", true);
                 fail("Setting a memory key that wasn't declared should fail");
-            } catch (IllegalArgumentException e) {
-                assertEquals(GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey("a").getMessage(), e.getMessage());
+            } catch (Exception e) {
+                validateException(GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey("a"), e);
             }
         }
 
         @Override
         public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
-
+            try {
+                memory.get("a");
+                fail("The memory key does not exist and should fail");
+            } catch (Exception e) {
+                validateException(Memory.Exceptions.memoryDoesNotExist("a"), e);
+            }
+            try {
+                memory.add("a", true);
+                fail("Setting a memory key that wasn't declared should fail");
+            } catch (Exception e) {
+                validateException(GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey("a"), e);
+            }
         }
 
         @Override
         public boolean terminate(final Memory memory) {
+            try {
+                memory.get("a");
+                fail("The memory key does not exist and should fail");
+            } catch (Exception e) {
+                validateException(Memory.Exceptions.memoryDoesNotExist("a"), e);
+            }
+            try {
+                memory.set("a", true);
+               // fail("Setting a memory key that wasn't declared should fail");
+            } catch (Exception e) {
+                validateException(GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey("a"), e);
+            }
             return true;
         }
 
-
         @Override
         public Set<MessageScope> getMessageScopes(final Memory memory) {
             return Collections.emptySet();
@@ -517,8 +545,8 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         @Override
         public Set<MemoryComputeKey> getMemoryComputeKeys() {
             return new HashSet<>(Arrays.asList(
-                    MemoryComputeKey.of("a", MemoryComputeKey.sumIntegerOperator(), false),
-                    MemoryComputeKey.of("b", MemoryComputeKey.sumIntegerOperator(), false)));
+                    MemoryComputeKey.of("a", MemoryComputeKey.sumIntegerOperator(), true, false),
+                    MemoryComputeKey.of("b", MemoryComputeKey.sumIntegerOperator(), true, false)));
         }
 
         @Override
@@ -644,12 +672,12 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         @Override
         public Set<MemoryComputeKey> getMemoryComputeKeys() {
             return new HashSet<>(Arrays.asList(
-                    MemoryComputeKey.of("a", MemoryComputeKey.sumLongOperator(), false),
-                    MemoryComputeKey.of("b", MemoryComputeKey.sumLongOperator(), false),
-                    MemoryComputeKey.of("c", MemoryComputeKey.andOperator(), false),
-                    MemoryComputeKey.of("d", MemoryComputeKey.orOperator(), false),
-                    MemoryComputeKey.of("e", MemoryComputeKey.andOperator(), false),
-                    MemoryComputeKey.of("f", MemoryComputeKey.setOperator(), false)));
+                    MemoryComputeKey.of("a", MemoryComputeKey.sumLongOperator(), true, false),
+                    MemoryComputeKey.of("b", MemoryComputeKey.sumLongOperator(), true, false),
+                    MemoryComputeKey.of("c", MemoryComputeKey.andOperator(), true, false),
+                    MemoryComputeKey.of("d", MemoryComputeKey.orOperator(), true, false),
+                    MemoryComputeKey.of("e", MemoryComputeKey.andOperator(), true, false),
+                    MemoryComputeKey.of("f", MemoryComputeKey.setOperator(), true, false)));
         }
 
         @Override
@@ -1153,7 +1181,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
         @Override
         public Set<MemoryComputeKey> getMemoryComputeKeys() {
-            return Collections.singleton(MemoryComputeKey.of("test", MemoryComputeKey.sumIntegerOperator(), false));
+            return Collections.singleton(MemoryComputeKey.of("test", MemoryComputeKey.sumIntegerOperator(), true, false));
         }
 
         @Override
@@ -1471,7 +1499,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
         @Override
         public Set<MemoryComputeKey> getMemoryComputeKeys() {
-            return Collections.singleton(MemoryComputeKey.<Long>of("workerCount", MemoryComputeKey.sumLongOperator(), false));
+            return Collections.singleton(MemoryComputeKey.<Long>of("workerCount", MemoryComputeKey.sumLongOperator(), true, false));
         }
 
         /*public void workerIterationStart(final Memory memory) {
@@ -2081,9 +2109,9 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         @Override
         public Set<MemoryComputeKey> getMemoryComputeKeys() {
             return new HashSet<>(Arrays.asList(
-                    MemoryComputeKey.of("m1", MemoryComputeKey.orOperator(), true),
-                    MemoryComputeKey.of("m2", MemoryComputeKey.andOperator(), true),
-                    MemoryComputeKey.of("m3", MemoryComputeKey.sumLongOperator(), false)));
+                    MemoryComputeKey.of("m1", MemoryComputeKey.orOperator(), true, true),
+                    MemoryComputeKey.of("m2", MemoryComputeKey.andOperator(), true, true),
+                    MemoryComputeKey.of("m3", MemoryComputeKey.sumLongOperator(), true, false)));
         }
 
         @Override
@@ -2132,4 +2160,119 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
             return "anObject";
         }
     }
+
+    ///////////////////////////////////
+
+    @Test
+    @LoadGraphWith(MODERN)
+    public void shouldSupportBroadcastKeys() throws Exception {
+        final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramP()).submit().get();
+        assertTrue(result.memory().exists("m1"));
+        assertFalse(result.memory().exists("m2"));
+        assertFalse(result.memory().exists("m3"));
+        assertTrue(result.memory().exists("m4"));
+        assertTrue(result.memory().get("m1"));
+        assertEquals(-18, result.memory().<Integer>get("m4").intValue());
+        assertEquals(2, result.memory().keys().size());
+    }
+
+    private static class VertexProgramP extends StaticVertexProgram {
+
+        @Override
+        public void setup(final Memory memory) {
+            assertFalse(memory.exists("m1"));  // or
+            assertFalse(memory.exists("m2"));  // and
+            assertFalse(memory.exists("m3"));  // long
+            assertFalse(memory.exists("m4"));  // int
+            memory.set("m1", false);
+            memory.set("m2", true);
+            memory.set("m3", 0l);
+            memory.set("m4", 0);
+        }
+
+        @Override
+        public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
+            if (memory.isInitialIteration()) {
+                assertFalse(memory.exists("m1"));
+                assertTrue(memory.exists("m2"));
+                assertTrue(memory.get("m2"));
+                assertFalse(memory.exists("m3"));
+                assertTrue(memory.exists("m4"));
+                assertEquals(0, memory.<Integer>get("m4").intValue());
+                memory.add("m1", false);
+                memory.add("m2", true);
+                memory.add("m3", 1l);
+                memory.add("m4", -1);
+            } else {
+                assertFalse(memory.exists("m1")); // no broadcast
+                assertTrue(memory.exists("m2"));
+                assertFalse(memory.exists("m3")); // no broadcast
+                assertTrue(memory.exists("m4"));
+                try {
+                    assertFalse(memory.get("m1"));
+                    fail();
+                } catch (final Exception e) {
+                    validateException(Memory.Exceptions.memoryDoesNotExist("m1"), e);
+                }
+                assertTrue(memory.get("m2"));
+                try {
+                    assertEquals(6l, memory.<Long>get("m3").longValue());
+                    fail();
+                } catch (final Exception e) {
+                    validateException(Memory.Exceptions.memoryDoesNotExist("m3"), e);
+                }
+                assertEquals(-6l, memory.<Integer>get("m4").intValue());
+                ///
+                memory.add("m1", true);
+                memory.add("m2", true);
+                memory.add("m3", 2l);
+                memory.add("m4", -2);
+            }
+        }
+
+        @Override
+        public boolean terminate(final Memory memory) {
+            assertTrue(memory.exists("m1"));
+            assertTrue(memory.exists("m2"));
+            assertTrue(memory.exists("m3"));
+            assertTrue(memory.exists("m4"));
+            if (memory.isInitialIteration()) {
+                assertFalse(memory.get("m1"));
+                assertTrue(memory.get("m2"));
+                assertEquals(6l, memory.<Long>get("m3").longValue());
+                assertEquals(-6, memory.<Integer>get("m4").intValue());
+                return false;
+            } else {
+                assertTrue(memory.get("m1"));
+                assertTrue(memory.get("m2"));
+                assertEquals(18l, memory.<Long>get("m3").longValue());
+                assertEquals(-18, memory.<Integer>get("m4").intValue());
+                return true;
+            }
+        }
+
+        @Override
+        public Set<MessageScope> getMessageScopes(final Memory memory) {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Set<MemoryComputeKey> getMemoryComputeKeys() {
+            return new HashSet<>(Arrays.asList(
+                    MemoryComputeKey.of("m1", MemoryComputeKey.orOperator(), false, false),
+                    MemoryComputeKey.of("m2", MemoryComputeKey.andOperator(), true, true),
+                    MemoryComputeKey.of("m3", MemoryComputeKey.sumLongOperator(), false, true),
+                    MemoryComputeKey.of("m4", MemoryComputeKey.sumIntegerOperator(), true, false)));
+        }
+
+        @Override
+        public GraphComputer.ResultGraph getPreferredResultGraph() {
+            return GraphComputer.ResultGraph.NEW;
+        }
+
+        @Override
+        public GraphComputer.Persist getPreferredPersist() {
+            return GraphComputer.Persist.VERTEX_PROPERTIES;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index e368aa6..950d3eb 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -240,9 +240,9 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
                     // execute the vertex program
                     while (true) {
-                        memory.setInTask(true);
+                        memory.setInExecute(true);
                         viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
-                        memory.setInTask(false);
+                        memory.setInExecute(false);
                         if (this.vertexProgram.terminate(memory))
                             break;
                         else {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
index cb672e1..0b85969 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
@@ -44,37 +44,37 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public final class SparkMemory implements Memory.Admin, Serializable {
 
-    public final Map<String, MemoryComputeKey> memoryKeys = new HashMap<>();
-    private final AtomicInteger iteration = new AtomicInteger(0);   // do these need to be atomics?
+    public final Map<String, MemoryComputeKey> memoryComputeKeys = new HashMap<>();
+    private final Map<String, Accumulator<ObjectWritable>> sparkMemory = new HashMap<>();
+    private final AtomicInteger iteration = new AtomicInteger(0);
     private final AtomicLong runtime = new AtomicLong(0l);
-    private final Map<String, Accumulator<ObjectWritable>> memory = new HashMap<>();
     private Broadcast<Map<String, Object>> broadcast;
-    private boolean inTask = false;
+    private boolean inExecute = false;
 
     public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
         if (null != vertexProgram) {
             for (final MemoryComputeKey key : vertexProgram.getMemoryComputeKeys()) {
-                this.memoryKeys.put(key.getKey(), key);
+                this.memoryComputeKeys.put(key.getKey(), key);
             }
         }
         for (final MapReduce mapReduce : mapReducers) {
-            this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false));
+            this.memoryComputeKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false, false));
         }
-        for (final MemoryComputeKey memoryComputeKey : this.memoryKeys.values()) {
-            this.memory.put(
+        for (final MemoryComputeKey memoryComputeKey : this.memoryComputeKeys.values()) {
+            this.sparkMemory.put(
                     memoryComputeKey.getKey(),
                     sparkContext.accumulator(ObjectWritable.empty(), memoryComputeKey.getKey(), new MemoryAccumulator<>(memoryComputeKey)));
         }
-        this.broadcast = sparkContext.broadcast(new HashMap<>());
+        this.broadcast = sparkContext.broadcast(Collections.emptyMap());
     }
 
     @Override
     public Set<String> keys() {
-        if (this.inTask)
+        if (this.inExecute)
             return this.broadcast.getValue().keySet();
         else {
             final Set<String> trueKeys = new HashSet<>();
-            this.memory.forEach((key, value) -> {
+            this.sparkMemory.forEach((key, value) -> {
                 if (!value.value().isEmpty())
                     trueKeys.add(key);
             });
@@ -109,8 +109,12 @@ public final class SparkMemory implements Memory.Admin, Serializable {
 
     @Override
     public <R> R get(final String key) throws IllegalArgumentException {
-        final ObjectWritable<R> r = (ObjectWritable<R>) (this.inTask ? this.broadcast.value().get(key) : this.memory.get(key).value());
-        if (r.isEmpty())
+        if (!this.memoryComputeKeys.containsKey(key))
+            throw Memory.Exceptions.memoryDoesNotExist(key);
+        if (this.inExecute && !this.memoryComputeKeys.get(key).isBroadcast())
+            throw Memory.Exceptions.memoryDoesNotExist(key);
+        final ObjectWritable<R> r = (ObjectWritable<R>) (this.inExecute ? this.broadcast.value().get(key) : this.sparkMemory.get(key).value());
+        if (null == r || r.isEmpty())
             throw Memory.Exceptions.memoryDoesNotExist(key);
         else
             return r.get();
@@ -119,8 +123,8 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     @Override
     public void add(final String key, final Object value) {
         checkKeyValue(key, value);
-        if (this.inTask)
-            this.memory.get(key).add(new ObjectWritable<>(value));
+        if (this.inExecute)
+            this.sparkMemory.get(key).add(new ObjectWritable<>(value));
         else
             throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key);
     }
@@ -128,10 +132,10 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     @Override
     public void set(final String key, final Object value) {
         checkKeyValue(key, value);
-        if (this.inTask)
+        if (this.inExecute)
             throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
         else
-            this.memory.get(key).setValue(new ObjectWritable<>(value));
+            this.sparkMemory.get(key).setValue(new ObjectWritable<>(value));
     }
 
     @Override
@@ -140,25 +144,25 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     }
 
     protected void complete() {
-        this.memoryKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(memoryComputeKey -> this.memory.remove(memoryComputeKey.getKey()));
+        this.memoryComputeKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(memoryComputeKey -> this.sparkMemory.remove(memoryComputeKey.getKey()));
     }
 
-    protected void setInTask(final boolean inTask) {
-        this.inTask = inTask;
+    protected void setInExecute(final boolean inExecute) {
+        this.inExecute = inExecute;
     }
 
     protected void broadcastMemory(final JavaSparkContext sparkContext) {
         this.broadcast.destroy(true); // do we need to block?
         final Map<String, Object> toBroadcast = new HashMap<>();
-        this.memory.forEach((key, object) -> {
-            if (!object.value().isEmpty())
+        this.sparkMemory.forEach((key, object) -> {
+            if (!object.value().isEmpty() && this.memoryComputeKeys.get(key).isBroadcast())
                 toBroadcast.put(key, object.value());
         });
         this.broadcast = sparkContext.broadcast(toBroadcast);
     }
 
     private void checkKeyValue(final String key, final Object value) {
-        if (!this.memoryKeys.containsKey(key))
+        if (!this.memoryComputeKeys.containsKey(key))
             throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
         MemoryHelper.validateValue(value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/14d0d484/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
index a1a3270..4186087 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -54,13 +55,13 @@ public final class TinkerMemory implements Memory.Admin {
             }
         }
         for (final MapReduce mapReduce : mapReducers) {
-            this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false));
+            this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false, false));
         }
     }
 
     @Override
     public Set<String> keys() {
-        return this.previousMap.keySet();
+        return this.previousMap.keySet().stream().filter(key -> !this.inExecute || this.memoryKeys.get(key).isBroadcast()).collect(Collectors.toSet());
     }
 
     @Override
@@ -109,6 +110,8 @@ public final class TinkerMemory implements Memory.Admin {
         final R r = (R) this.previousMap.get(key);
         if (null == r)
             throw Memory.Exceptions.memoryDoesNotExist(key);
+        else if (this.inExecute && !this.memoryKeys.get(key).isBroadcast())
+            throw Memory.Exceptions.memoryDoesNotExist(key);
         else
             return r;
     }