You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/04 18:55:25 UTC

[1/8] incubator-tinkerpop git commit: Empty lists are not created if no messages or views are created. Instead the payload is null. This helps to reduce memory footprint both RAM and during shuffle/disk/network.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 8fd0bae0f -> 5fafb0b48


Empty lists are not created if no messages or views are created. Instead the payload is null. This helps to reduce memory footprint both RAM and during shuffle/disk/network.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4a788868
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4a788868
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4a788868

Branch: refs/heads/master
Commit: 4a7888681152cb005d632416371b7f66da6f119c
Parents: 64b09df
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon May 2 15:06:53 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon May 2 15:06:53 2016 -0600

----------------------------------------------------------------------
 .../gremlin/spark/process/computer/SparkExecutor.java   | 12 +++++++++---
 .../process/computer/payload/ViewOutgoingPayload.java   |  9 +++++----
 2 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4a788868/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 3e0f09a..4ae5912 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
@@ -80,7 +81,7 @@ public final class SparkExecutor {
 
         if (null != viewIncomingRDD) // the graphRDD and the viewRDD must have the same partitioner
             assert graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get());
-        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
+        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
                 graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
                 graphRDD.leftOuterJoin(viewIncomingRDD))                                                   // every other iteration may have views and messages
                 // for each partition of vertices emit a view and their outgoing messages
@@ -111,13 +112,18 @@ public final class SparkExecutor {
                         ///
                         final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
-                                IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
+                                IteratorUtils.list(
+                                        IteratorUtils.map(
+                                                IteratorUtils.filter(
+                                                        vertex.properties(elementComputeKeysArray),
+                                                        VertexProperty::isPresent),
+                                                property -> DetachedFactory.detach(property, true)));
                         final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
                         return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
                     });
-                }, true)); // true means that the partition is preserved
+                }, true); // true means that the partition is preserved
         // the graphRDD and the viewRDD must have the same partitioner
         assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
         // "message pass" by reducing on the vertex object id of the view and message payloads

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4a788868/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
index 20c8e09..f532c4e 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.spark.process.computer.payload;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import scala.Tuple2;
 
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -36,15 +37,15 @@ public final class ViewOutgoingPayload<M> implements Payload {
     }
 
     public ViewOutgoingPayload(final List<DetachedVertexProperty<Object>> view, final List<Tuple2<Object, M>> outgoingMessages) {
-        this.view = view;
-        this.outgoingMessages = outgoingMessages;
+        this.view = view.isEmpty() ? null : view;
+        this.outgoingMessages = outgoingMessages.isEmpty() ? null : outgoingMessages;
     }
 
     public ViewPayload getView() {
-        return new ViewPayload(this.view);
+        return new ViewPayload(null == this.view ? Collections.emptyList() : this.view);
     }
 
     public List<Tuple2<Object, M>> getOutgoingMessages() {
-        return this.outgoingMessages;
+        return null == this.outgoingMessages ? Collections.emptyList() : this.outgoingMessages;
     }
 }


[2/8] incubator-tinkerpop git commit: a bunch of nick-nack optimizations generally in TraversalVertexProgram and specifically in SparkGraphComputer. If there are no HALTED_TRAVERSERS, then do not propagate an empty set -- property.remove(). In Spark, if

Posted by ok...@apache.org.
a bunch of nick-nack optimizations generally in TraversalVertexProgram and specifically in SparkGraphComputer. If there are no HALTED_TRAVERSERS, then do not propagate an empty set -- property.remove(). In Spark, if there are no outgoing messages or new view, do not propagate empty ViewPayloads -- using null. Found a memory bug in TraversalVertexProgram where if the HALTED_TRAVERSERS are suppose to go back to the master traverasl, they were still being persisted across the vertices. These tweaks should definately reduce stress on large graphs as the memory footprint is greatly reduced. Unfortutnately, we still need reduceByKey() even on empty views/messages as its not known that its empty until after the action.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/cd5524d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/cd5524d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/cd5524d7

Branch: refs/heads/master
Commit: cd5524d73928c0e9b8a2260fad1b1e29c3f53ef5
Parents: 4a78886
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon May 2 16:16:15 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon May 2 16:16:15 2016 -0600

----------------------------------------------------------------------
 .../traversal/TraversalVertexProgram.java       | 28 +++++++++++---------
 .../computer/traversal/TraverserExecutor.java   |  6 +++--
 .../process/computer/GraphComputerTest.java     |  4 +--
 .../spark/process/computer/SparkExecutor.java   | 23 ++++++++--------
 .../computer/payload/ViewOutgoingPayload.java   |  2 +-
 .../process/computer/payload/ViewPayload.java   |  2 +-
 6 files changed, 34 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 8e4a75e..694e307 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
@@ -59,7 +60,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.ProfileSid
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
@@ -117,7 +117,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     private PureTraversal<?, ?> traversal;
     private TraversalMatrix<?, ?> traversalMatrix;
     private final Set<MapReduce> mapReducers = new HashSet<>();
-    private boolean keepDistributedHaltedTraversers = true;
+    private boolean returnHaltedTraversers = false;
 
     private TraversalVertexProgram() {
     }
@@ -143,8 +143,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         /// traversal is compiled and ready to be introspected
         this.traversalMatrix = new TraversalMatrix<>(this.traversal.get());
         // if results will be serialized out, don't save halted traversers across the cluster
-        this.keepDistributedHaltedTraversers =
-                !(this.traversal.get().getParent().asStep().getNextStep() instanceof ComputerResultStep || // if its just going to stream it out, don't distribute
+        this.returnHaltedTraversers =
+                (this.traversal.get().getParent().asStep().getNextStep() instanceof ComputerResultStep || // if its just going to stream it out, don't distribute
                         this.traversal.get().getParent().asStep().getNextStep() instanceof EmptyStep ||  // same as above, but if using TraversalVertexProgramStep directly
                         (this.traversal.get().getParent().asStep().getNextStep() instanceof ProfileStep && // same as above, but needed for profiling
                                 this.traversal.get().getParent().asStep().getNextStep().getNextStep() instanceof ComputerResultStep));
@@ -166,7 +166,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             this.traversal.get().getSideEffects().register(profileStep.getId(), new MutableMetricsSupplier(profileStep.getPreviousStep()), ProfileStep.ProfileBiOperator.instance());
         }
         // register TraversalVertexProgram specific memory compute keys
-        this.memoryComputeKeys.add(MemoryComputeKey.of(HALTED_TRAVERSERS, Operator.addAll, false, this.keepDistributedHaltedTraversers)); // only keep if it will be preserved
+        this.memoryComputeKeys.add(MemoryComputeKey.of(HALTED_TRAVERSERS, Operator.addAll, false, !this.returnHaltedTraversers)); // only keep if it will be preserved
         this.memoryComputeKeys.add(MemoryComputeKey.of(ACTIVE_TRAVERSERS, Operator.addAll, true, true));
         this.memoryComputeKeys.add(MemoryComputeKey.of(MUTATED_MEMORY_KEYS, Operator.addAll, false, true));
         this.memoryComputeKeys.add(MemoryComputeKey.of(COMPLETED_BARRIERS, Operator.addAll, true, true));
@@ -207,10 +207,11 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             if (step instanceof Barrier)
                 ((Barrier) this.traversalMatrix.getStepById(stepId)).done();
         }
+        // define halted traversers
+        final TraverserSet<Object> haltedTraversers = vertex.<TraverserSet<Object>>property(HALTED_TRAVERSERS).orElse(new TraverserSet<>());
+        vertex.property(VertexProperty.Cardinality.single, HALTED_TRAVERSERS, haltedTraversers);
         //////////////////
         if (memory.isInitialIteration()) {    // ITERATION 1
-            final TraverserSet<Object> haltedTraversers = vertex.<TraverserSet<Object>>property(HALTED_TRAVERSERS).orElse(new TraverserSet<>());
-            vertex.property(VertexProperty.Cardinality.single, HALTED_TRAVERSERS, haltedTraversers);
             final TraverserSet<Object> activeTraversers = new TraverserSet<>();
             IteratorUtils.removeOnNext(haltedTraversers.iterator()).forEachRemaining(traverser -> {
                 traverser.setStepId(this.traversal.get().getStartStep().getId());
@@ -229,19 +230,20 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 graphStep.forEachRemaining(traverser -> {
                     if (traverser.isHalted()) {
                         traverser.detach();
-                        haltedTraversers.add((Traverser.Admin) traverser);
-                        if (!this.keepDistributedHaltedTraversers)
+                        if (this.returnHaltedTraversers)
                             memory.add(HALTED_TRAVERSERS, new TraverserSet<>(traverser));
+                        else
+                            haltedTraversers.add((Traverser.Admin) traverser);
                     } else
                         activeTraversers.add((Traverser.Admin) traverser);
                 });
             }
-            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, !this.keepDistributedHaltedTraversers));
+            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers));
         } else {  // ITERATION 1+
-            memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix, memory, !this.keepDistributedHaltedTraversers));
+            memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers));
         }
-        if (!this.keepDistributedHaltedTraversers)
-            vertex.<TraverserSet>property(HALTED_TRAVERSERS).value().clear();
+        if (this.returnHaltedTraversers || haltedTraversers.isEmpty())
+            vertex.<TraverserSet>property(HALTED_TRAVERSERS).remove();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
index 2ba925b..167924f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -154,9 +154,10 @@ public final class TraverserExecutor {
                         traverser.addLabels(step.getLabels());  // this might need to be generalized for working with global barriers too
                         if (traverser.isHalted()) {
                             traverser.detach();
-                            haltedTraversers.add(traverser);
                             if (returnHaltedTraversers)
                                 memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser.split()));
+                            else
+                                haltedTraversers.add(traverser);
                         } else {
                             traverser.detach();
                             traverserSet.add(traverser);
@@ -175,9 +176,10 @@ public final class TraverserExecutor {
             step.forEachRemaining(traverser -> {
                 if (traverser.isHalted()) {
                     traverser.detach();
-                    haltedTraversers.add(traverser);
                     if (returnHaltedTraversers)
                         memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser.split()));
+                    else
+                        haltedTraversers.add(traverser);
                 } else {
                     activeTraversers.add(traverser);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 520c611..761ae06 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -1973,10 +1973,10 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         assertEquals(2l, traversers.stream().filter(s -> s.get().equals("software")).map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue());
         assertEquals(6, graph3.traversal().V().count().next().intValue());
         assertEquals(6, graph3.traversal().E().count().next().intValue());
-        assertEquals(6, graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue());
+        assertEquals(0, graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue());
         assertEquals(6, graph3.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
         assertEquals(6, graph3.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
-        assertEquals(30, graph3.traversal().V().values().count().next().intValue());
+        assertEquals(24, graph3.traversal().V().values().count().next().intValue()); // no halted traversers
 
         // TODO: add a test the shows DAG behavior -- splitting another TraversalVertexProgram off of the PeerPressureVertexProgram job.
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 4ae5912..cd1440d 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -96,28 +96,27 @@ public final class SparkExecutor {
                         final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
                         final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration() ? new ArrayList<>() : Collections.emptyList();
                         // revive compute properties if they already exist
-                        if (memory.isInitialIteration() && elementComputeKeysArray.length > 0) {
+                        if (memory.isInitialIteration() && elementComputeKeysArray.length > 0)
                             vertex.properties(elementComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
-                        }
                         // drop any computed properties that are cached in memory
-                        if (elementComputeKeysArray.length > 0)
-                            vertex.dropVertexProperties(elementComputeKeysArray);
+                        if (elementComputeKeysArray.length > 0) vertex.dropVertexProperties(elementComputeKeysArray);
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         previousView.forEach(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
-                        // previousView.clear(); // no longer needed so kill it from memory
+                        previousView.clear(); // no longer needed so kill it from memory
                         ///
                         messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
                         workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
-                        // incomingMessages.clear(); // no longer needed so kill it from memory
+                        incomingMessages.clear(); // no longer needed so kill it from memory
                         ///
                         final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
-                                IteratorUtils.list(
-                                        IteratorUtils.map(
-                                                IteratorUtils.filter(
-                                                        vertex.properties(elementComputeKeysArray),
-                                                        VertexProperty::isPresent),
-                                                property -> DetachedFactory.detach(property, true)));
+                                IteratorUtils.list(IteratorUtils.map(
+                                        IteratorUtils.filter(
+                                                vertex.properties(elementComputeKeysArray),
+                                                VertexProperty::isPresent),
+                                        property -> DetachedFactory.detach(property, true)));
+                        // drop any computed properties that are cached in memory
+                        if (elementComputeKeysArray.length > 0) vertex.dropVertexProperties(elementComputeKeysArray);
                         final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
index f532c4e..5e1cc0b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
@@ -42,7 +42,7 @@ public final class ViewOutgoingPayload<M> implements Payload {
     }
 
     public ViewPayload getView() {
-        return new ViewPayload(null == this.view ? Collections.emptyList() : this.view);
+        return new ViewPayload(this.view);
     }
 
     public List<Tuple2<Object, M>> getOutgoingMessages() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
index dec0e66..1cdf5f4 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
@@ -38,6 +38,6 @@ public final class ViewPayload implements Payload {
     }
 
     public List<DetachedVertexProperty<Object>> getView() {
-        return this.view;
+        return null == this.view ? Collections.emptyList() : this.view;
     }
 }


[8/8] incubator-tinkerpop git commit: updated CHANGELOG and upgrade docs.

Posted by ok...@apache.org.
updated CHANGELOG and upgrade docs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/5fafb0b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/5fafb0b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/5fafb0b4

Branch: refs/heads/master
Commit: 5fafb0b48961fc5735437884ddebb4b9f682b955
Parents: c62ee51
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 4 12:55:20 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 4 12:55:20 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.asciidoc                                 |  2 ++
 docs/src/upgrade/release-3.2.x-incubating.asciidoc | 15 +++++++++++++++
 2 files changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5fafb0b4/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 47ca781..27152b7 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,8 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.2.1 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* `SparkGraphComputer` no longer shuffles empty views or empty outgoing messages in order to save time and space.
+* `TraversalVertexProgram` no longer maintains empty halted traverser properties in order to save space.
 * Added `List<P<V>>` constructors to `ConnectiveP`, `AndP`, and `OrP` for ease of use.
 * Added support for interactive (`-i`) and execute (`-e`) modes for Gremlin Console.
 * Displayed line numbers for script execution failures of `-e` and `-i`.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5fafb0b4/docs/src/upgrade/release-3.2.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade/release-3.2.x-incubating.asciidoc b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
index 0d5836d..8fc96ee 100644
--- a/docs/src/upgrade/release-3.2.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
@@ -32,6 +32,21 @@ Please see the link:https://github.com/apache/incubator-tinkerpop/blob/3.2.1-inc
 Upgrading for Users
 ~~~~~~~~~~~~~~~~~~~
 
+TraversalVertexProgram
+^^^^^^^^^^^^^^^^^^^^^^
+
+`TraversalVertexProgram` always maintained a `HALTED_TRAVERSERS` `TraverserSet` for each vertex throughout the life
+of the OLAP computation. However, if there are no halted traversers in the set, then there is no point in keeping that
+compute property around as without it, time and space can be saved. Users that have `VertexPrograms` that are chained off
+of `TraversalVertexProgram` and have previously assumed that `HALTED_TRAVERSERS` always exists at each vertex, should no
+longer assume that.
+
+[source,java]
+// bad code
+TraverserSet haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
+// good code
+TraverserSet haltedTraversers = vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(new TraverserSet());
+
 Interrupting Traversals
 ^^^^^^^^^^^^^^^^^^^^^^^
 


[5/8] incubator-tinkerpop git commit: finally figured out how to do a reduceByKey() with empty tuples. This is the super optimization -- if there are no views and no outgoing messages, then the reduceByKey is trivially complex. For TraversalVertexProgram

Posted by ok...@apache.org.
finally figured out how to do a reduceByKey() with empty tuples. This is the super optimization -- if there are no views and no outgoing messages, then the reduceByKey is trivially complex. For TraversalVertexProgram, this means that the final step takes no time at all. Running integration tests overnight.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/79ebaf9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/79ebaf9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/79ebaf9f

Branch: refs/heads/master
Commit: 79ebaf9f94f0b645ba493551ff219a786003cc85
Parents: 6f13c0c
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon May 2 19:03:11 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon May 2 19:03:11 2016 -0600

----------------------------------------------------------------------
 .../spark/process/computer/SparkExecutor.java   | 51 ++++++++++++--------
 .../computer/payload/ViewIncomingPayload.java   |  5 ++
 2 files changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79ebaf9f/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 520701e..3ebcb01 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -88,7 +88,7 @@ public final class SparkExecutor {
                 .mapPartitionsToPair(partitionIterator -> {
                     HadoopPools.initialize(apacheConfiguration);
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
-                    final String[] elementComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
+                    final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();
                     workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
                     return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
@@ -96,10 +96,10 @@ public final class SparkExecutor {
                         final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
                         final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration() ? new ArrayList<>() : Collections.emptyList();
                         // revive compute properties if they already exist
-                        if (memory.isInitialIteration() && elementComputeKeysArray.length > 0)
-                            vertex.properties(elementComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
+                        if (memory.isInitialIteration() && vertexComputeKeysArray.length > 0)
+                            vertex.properties(vertexComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
                         // drop any computed properties that are cached in memory
-                        if (elementComputeKeysArray.length > 0) vertex.dropVertexProperties(elementComputeKeysArray);
+                        if (vertexComputeKeysArray.length > 0) vertex.dropVertexProperties(vertexComputeKeysArray);
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         IteratorUtils.removeOnNext(previousView.iterator()).forEachRemaining(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
                         assert previousView.isEmpty();
@@ -109,21 +109,22 @@ public final class SparkExecutor {
                         // assert incomingMessages.isEmpty();  // maybe the program didn't read all the messages
                         incomingMessages.clear();
                         ///
-                        final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
+                        final List<DetachedVertexProperty<Object>> nextView = vertexComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
                                 IteratorUtils.list(IteratorUtils.map(
                                         IteratorUtils.filter(
-                                                vertex.properties(elementComputeKeysArray),
+                                                vertex.properties(vertexComputeKeysArray),
                                                 VertexProperty::isPresent),
                                         property -> DetachedFactory.detach(property, true)));
                         // drop any computed properties that are cached in memory
-                        if (elementComputeKeysArray.length > 0) vertex.dropVertexProperties(elementComputeKeysArray);
+                        if (vertexComputeKeysArray.length > 0) vertex.dropVertexProperties(vertexComputeKeysArray);
                         final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
-                        return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
+                        return (nextView.isEmpty() && outgoingMessages.isEmpty()) ? null : new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
                     });
-                }, true); // true means that the partition is preserved
+                }, true)  // true means that the partition is preserved
+                .filter(tuple -> null != tuple); // if there are no messages or views, then the tuple is null (memory optimization)
         // the graphRDD and the viewRDD must have the same partitioner
         assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
         // "message pass" by reducing on the vertex object id of the view and message payloads
@@ -146,11 +147,14 @@ public final class SparkExecutor {
                         return c;
                     }
                 })
-                .filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
-                .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
-                .mapValues(payload -> payload instanceof ViewIncomingPayload ?
-                        (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
-                        new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
+                .mapValues(payload -> {
+                    if (payload instanceof ViewIncomingPayload) // this happens if there is a vertex with incoming messages
+                        return (ViewIncomingPayload<M>) payload;
+                    else if (payload instanceof ViewPayload)    // this happens if there is a vertex with no incoming messages
+                        return new ViewIncomingPayload<>((ViewPayload) payload);
+                    else                                        // this happens when there is a single message to a vertex that has no view or outgoing messages
+                        return new ViewIncomingPayload<>((MessagePayload<M>) payload);
+                });
         // the graphRDD and the viewRDD must have the same partitioner
         assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
         newViewIncomingRDD
@@ -160,16 +164,20 @@ public final class SparkExecutor {
         return newViewIncomingRDD;
     }
 
-    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final Set<VertexComputeKey> vertexComputeKeys) {
+    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
+            final JavaPairRDD<Object, VertexWritable> graphRDD,
+            final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
+            final Set<VertexComputeKey> vertexComputeKeys) {
         // the graphRDD and the viewRDD must have the same partitioner
         assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
         // attach the final computed view to the cached graph
+        final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
         return graphRDD.leftOuterJoin(viewIncomingRDD)
                 .mapValues(tuple -> {
                     final StarGraph.StarVertex vertex = tuple._1().get();
+                    vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
                     final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
                     for (final DetachedVertexProperty<Object> property : view) {
-                        vertex.dropVertexProperties(property.key());
                         if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
                             property.attach(Attachable.Method.create(vertex));
                     }
@@ -181,7 +189,9 @@ public final class SparkExecutor {
     // MAP REDUCE //
     ////////////////
 
-    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+    public static <K, V> JavaPairRDD<K, V> executeMap(
+            final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce,
+            final Configuration apacheConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
             return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
@@ -191,14 +201,17 @@ public final class SparkExecutor {
         return mapRDD;
     }
 
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD, final Configuration apacheConfiguration) {
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD,
+                                                                    final Configuration apacheConfiguration) {
         return mapRDD.mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
             return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
         });
     }
 
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(
+            final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce,
+            final Configuration apacheConfiguration) {
         JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
             return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/79ebaf9f/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
index 4c4850b..904583e 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
@@ -49,6 +49,11 @@ public final class ViewIncomingPayload<M> implements Payload {
             this.view = null;
     }
 
+    public ViewIncomingPayload(final MessagePayload<M> messagePayload) {
+        this.incomingMessages = new ArrayList<>();
+        this.incomingMessages.add(messagePayload.getMessage());
+    }
+
 
     public List<DetachedVertexProperty<Object>> getView() {
         return null == this.view ? Collections.emptyList() : this.view;


[6/8] incubator-tinkerpop git commit: some last minute cleanups, comments before PR. integration tests passed over night. Spark integration tests passed for these changes right now.

Posted by ok...@apache.org.
some last minute cleanups, comments before PR. integration tests passed over night. Spark integration tests passed for these changes right now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8fd95021
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8fd95021
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8fd95021

Branch: refs/heads/master
Commit: 8fd9502160b7940a806247a16406663ff4b27826
Parents: 79ebaf9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue May 3 07:49:14 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue May 3 07:49:14 2016 -0600

----------------------------------------------------------------------
 .../spark/process/computer/SparkExecutor.java   | 41 ++++++++++----------
 1 file changed, 20 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8fd95021/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 3ebcb01..c216510 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -36,7 +36,6 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
@@ -61,7 +60,7 @@ public final class SparkExecutor {
     // DATA LOADING //
     //////////////////
 
-    public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
+    public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(final JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
         return graphRDD.mapPartitionsToPair(partitionIterator -> {
             final GraphFilter gFilter = graphFilter.clone();
             return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
@@ -99,29 +98,27 @@ public final class SparkExecutor {
                         if (memory.isInitialIteration() && vertexComputeKeysArray.length > 0)
                             vertex.properties(vertexComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
                         // drop any computed properties that are cached in memory
-                        if (vertexComputeKeysArray.length > 0) vertex.dropVertexProperties(vertexComputeKeysArray);
+                        vertex.dropVertexProperties(vertexComputeKeysArray);
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         IteratorUtils.removeOnNext(previousView.iterator()).forEachRemaining(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
                         assert previousView.isEmpty();
-                        ///
+                        // do the vertex's vertex program iteration
                         messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
                         workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
                         // assert incomingMessages.isEmpty();  // maybe the program didn't read all the messages
                         incomingMessages.clear();
-                        ///
+                        // detached the compute property view from the vertex
                         final List<DetachedVertexProperty<Object>> nextView = vertexComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
-                                IteratorUtils.list(IteratorUtils.map(
-                                        IteratorUtils.filter(
-                                                vertex.properties(vertexComputeKeysArray),
-                                                VertexProperty::isPresent),
-                                        property -> DetachedFactory.detach(property, true)));
-                        // drop any computed properties that are cached in memory
-                        if (vertexComputeKeysArray.length > 0) vertex.dropVertexProperties(vertexComputeKeysArray);
-                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+                                IteratorUtils.list(IteratorUtils.map(vertex.properties(vertexComputeKeysArray), vertexProperty -> DetachedFactory.detach(vertexProperty, true)));
+                        // drop compute property view as it has now been detached from the vertex
+                        vertex.dropVertexProperties(vertexComputeKeysArray);
+                        final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages being sent by this vertex
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
-                        return (nextView.isEmpty() && outgoingMessages.isEmpty()) ? null : new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
+                        return (nextView.isEmpty() && outgoingMessages.isEmpty()) ?
+                                null : // if there is no view nor outgoing messages, emit nothing
+                                new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));  // else, emit the vertex id, its view, and its outgoing messages
                     });
                 }, true)  // true means that the partition is preserved
                 .filter(tuple -> null != tuple); // if there are no messages or views, then the tuple is null (memory optimization)
@@ -131,9 +128,11 @@ public final class SparkExecutor {
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
         final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
                 .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
-                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
-                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))  // emit the outgoing message payloads one by one
-                .reduceByKey(graphRDD.partitioner().get(), (a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
+                        // emit the view payload
+                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
+                        // emit the outgoing message payloads one by one
+                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
+                .reduceByKey(graphRDD.partitioner().get(), (a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
                     if (a instanceof ViewIncomingPayload) {
                         ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
                         return a;
@@ -147,10 +146,10 @@ public final class SparkExecutor {
                         return c;
                     }
                 })
-                .mapValues(payload -> {
-                    if (payload instanceof ViewIncomingPayload) // this happens if there is a vertex with incoming messages
+                .mapValues(payload -> { // handle various corner cases of when views don't exist, messages don't exist, or neither exists.
+                    if (payload instanceof ViewIncomingPayload) // this happens if there is a vertex view with incoming messages
                         return (ViewIncomingPayload<M>) payload;
-                    else if (payload instanceof ViewPayload)    // this happens if there is a vertex with no incoming messages
+                    else if (payload instanceof ViewPayload)    // this happens if there is a vertex view with no incoming messages
                         return new ViewIncomingPayload<>((ViewPayload) payload);
                     else                                        // this happens when there is a single message to a vertex that has no view or outgoing messages
                         return new ViewIncomingPayload<>((MessagePayload<M>) payload);
@@ -170,12 +169,12 @@ public final class SparkExecutor {
             final Set<VertexComputeKey> vertexComputeKeys) {
         // the graphRDD and the viewRDD must have the same partitioner
         assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
-        // attach the final computed view to the cached graph
         final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
         return graphRDD.leftOuterJoin(viewIncomingRDD)
                 .mapValues(tuple -> {
                     final StarGraph.StarVertex vertex = tuple._1().get();
                     vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
+                    // attach the final computed view to the cached graph
                     final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
                     for (final DetachedVertexProperty<Object> property : view) {
                         if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))


[4/8] incubator-tinkerpop git commit: more minor memory tweaks. running integration tests over night.

Posted by ok...@apache.org.
more minor memory tweaks. running integration tests over night.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/6f13c0cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/6f13c0cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/6f13c0cf

Branch: refs/heads/master
Commit: 6f13c0cfc20d8c0cbf1681359792e543bd3676bc
Parents: e3a4b7f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon May 2 16:36:26 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon May 2 16:36:26 2016 -0600

----------------------------------------------------------------------
 .../gremlin/spark/process/computer/SparkExecutor.java         | 7 ++++---
 .../spark/process/computer/payload/ViewIncomingPayload.java   | 4 +++-
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6f13c0cf/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index cd1440d..520701e 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -101,12 +101,13 @@ public final class SparkExecutor {
                         // drop any computed properties that are cached in memory
                         if (elementComputeKeysArray.length > 0) vertex.dropVertexProperties(elementComputeKeysArray);
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
-                        previousView.forEach(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
-                        previousView.clear(); // no longer needed so kill it from memory
+                        IteratorUtils.removeOnNext(previousView.iterator()).forEachRemaining(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
+                        assert previousView.isEmpty();
                         ///
                         messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
                         workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
-                        incomingMessages.clear(); // no longer needed so kill it from memory
+                        // assert incomingMessages.isEmpty();  // maybe the program didn't read all the messages
+                        incomingMessages.clear();
                         ///
                         final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6f13c0cf/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
index 94b124b..4c4850b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
@@ -44,7 +44,9 @@ public final class ViewIncomingPayload<M> implements Payload {
 
     public ViewIncomingPayload(final ViewPayload viewPayload) {
         this.incomingMessages = null;
-        this.view = viewPayload.getView().isEmpty() ? null : viewPayload.getView();
+        this.view = viewPayload.getView();
+        if (this.view.isEmpty())
+            this.view = null;
     }
 
 


[7/8] incubator-tinkerpop git commit: Merge branch 'TINKERPOP-1120'

Posted by ok...@apache.org.
Merge branch 'TINKERPOP-1120'


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c62ee51d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c62ee51d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c62ee51d

Branch: refs/heads/master
Commit: c62ee51d9552a5ec8018f25057f87fec5d2dcfac
Parents: 8fd0bae 8fd9502
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 4 12:51:59 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 4 12:51:59 2016 -0600

----------------------------------------------------------------------
 .../traversal/TraversalVertexProgram.java       | 28 +++----
 .../computer/traversal/TraverserExecutor.java   |  6 +-
 .../process/computer/GraphComputerTest.java     |  4 +-
 .../spark/process/computer/SparkExecutor.java   | 82 ++++++++++++--------
 .../computer/payload/ViewIncomingPayload.java   |  9 ++-
 .../computer/payload/ViewOutgoingPayload.java   |  7 +-
 .../process/computer/payload/ViewPayload.java   |  2 +-
 7 files changed, 84 insertions(+), 54 deletions(-)
----------------------------------------------------------------------



[3/8] incubator-tinkerpop git commit: another null memory tweak. no point sending around empty lists --- using null instead.

Posted by ok...@apache.org.
another null memory tweak. no point sending around empty lists --- using null instead.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e3a4b7ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e3a4b7ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e3a4b7ff

Branch: refs/heads/master
Commit: e3a4b7ff9bd730b7056b4ab224ea8e9255263c9b
Parents: cd5524d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon May 2 16:25:30 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon May 2 16:25:30 2016 -0600

----------------------------------------------------------------------
 .../spark/process/computer/payload/ViewIncomingPayload.java      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e3a4b7ff/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
index 2dc9796..94b124b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java
@@ -44,7 +44,7 @@ public final class ViewIncomingPayload<M> implements Payload {
 
     public ViewIncomingPayload(final ViewPayload viewPayload) {
         this.incomingMessages = null;
-        this.view = viewPayload.getView();
+        this.view = viewPayload.getView().isEmpty() ? null : viewPayload.getView();
     }
 
 
@@ -58,7 +58,7 @@ public final class ViewIncomingPayload<M> implements Payload {
     }
 
     public boolean hasView() {
-        return null != view;
+        return null != this.view;
     }
 
     ////////////////////