You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/04 18:55:26 UTC

[2/8] incubator-tinkerpop git commit: a bunch of nick-nack optimizations generally in TraversalVertexProgram and specifically in SparkGraphComputer. If there are no HALTED_TRAVERSERS, then do not propagate an empty set -- property.remove(). In Spark, if

a bunch of nick-nack optimizations generally in TraversalVertexProgram and specifically in SparkGraphComputer. If there are no HALTED_TRAVERSERS, then do not propagate an empty set -- property.remove(). In Spark, if there are no outgoing messages or new view, do not propagate empty ViewPayloads -- using null. Found a memory bug in TraversalVertexProgram where if the HALTED_TRAVERSERS are suppose to go back to the master traverasl, they were still being persisted across the vertices. These tweaks should definately reduce stress on large graphs as the memory footprint is greatly reduced. Unfortutnately, we still need reduceByKey() even on empty views/messages as its not known that its empty until after the action.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/cd5524d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/cd5524d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/cd5524d7

Branch: refs/heads/master
Commit: cd5524d73928c0e9b8a2260fad1b1e29c3f53ef5
Parents: 4a78886
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon May 2 16:16:15 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon May 2 16:16:15 2016 -0600

----------------------------------------------------------------------
 .../traversal/TraversalVertexProgram.java       | 28 +++++++++++---------
 .../computer/traversal/TraverserExecutor.java   |  6 +++--
 .../process/computer/GraphComputerTest.java     |  4 +--
 .../spark/process/computer/SparkExecutor.java   | 23 ++++++++--------
 .../computer/payload/ViewOutgoingPayload.java   |  2 +-
 .../process/computer/payload/ViewPayload.java   |  2 +-
 6 files changed, 34 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 8e4a75e..694e307 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
@@ -59,7 +60,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.ProfileSid
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
@@ -117,7 +117,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     private PureTraversal<?, ?> traversal;
     private TraversalMatrix<?, ?> traversalMatrix;
     private final Set<MapReduce> mapReducers = new HashSet<>();
-    private boolean keepDistributedHaltedTraversers = true;
+    private boolean returnHaltedTraversers = false;
 
     private TraversalVertexProgram() {
     }
@@ -143,8 +143,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         /// traversal is compiled and ready to be introspected
         this.traversalMatrix = new TraversalMatrix<>(this.traversal.get());
         // if results will be serialized out, don't save halted traversers across the cluster
-        this.keepDistributedHaltedTraversers =
-                !(this.traversal.get().getParent().asStep().getNextStep() instanceof ComputerResultStep || // if its just going to stream it out, don't distribute
+        this.returnHaltedTraversers =
+                (this.traversal.get().getParent().asStep().getNextStep() instanceof ComputerResultStep || // if its just going to stream it out, don't distribute
                         this.traversal.get().getParent().asStep().getNextStep() instanceof EmptyStep ||  // same as above, but if using TraversalVertexProgramStep directly
                         (this.traversal.get().getParent().asStep().getNextStep() instanceof ProfileStep && // same as above, but needed for profiling
                                 this.traversal.get().getParent().asStep().getNextStep().getNextStep() instanceof ComputerResultStep));
@@ -166,7 +166,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             this.traversal.get().getSideEffects().register(profileStep.getId(), new MutableMetricsSupplier(profileStep.getPreviousStep()), ProfileStep.ProfileBiOperator.instance());
         }
         // register TraversalVertexProgram specific memory compute keys
-        this.memoryComputeKeys.add(MemoryComputeKey.of(HALTED_TRAVERSERS, Operator.addAll, false, this.keepDistributedHaltedTraversers)); // only keep if it will be preserved
+        this.memoryComputeKeys.add(MemoryComputeKey.of(HALTED_TRAVERSERS, Operator.addAll, false, !this.returnHaltedTraversers)); // only keep if it will be preserved
         this.memoryComputeKeys.add(MemoryComputeKey.of(ACTIVE_TRAVERSERS, Operator.addAll, true, true));
         this.memoryComputeKeys.add(MemoryComputeKey.of(MUTATED_MEMORY_KEYS, Operator.addAll, false, true));
         this.memoryComputeKeys.add(MemoryComputeKey.of(COMPLETED_BARRIERS, Operator.addAll, true, true));
@@ -207,10 +207,11 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             if (step instanceof Barrier)
                 ((Barrier) this.traversalMatrix.getStepById(stepId)).done();
         }
+        // define halted traversers
+        final TraverserSet<Object> haltedTraversers = vertex.<TraverserSet<Object>>property(HALTED_TRAVERSERS).orElse(new TraverserSet<>());
+        vertex.property(VertexProperty.Cardinality.single, HALTED_TRAVERSERS, haltedTraversers);
         //////////////////
         if (memory.isInitialIteration()) {    // ITERATION 1
-            final TraverserSet<Object> haltedTraversers = vertex.<TraverserSet<Object>>property(HALTED_TRAVERSERS).orElse(new TraverserSet<>());
-            vertex.property(VertexProperty.Cardinality.single, HALTED_TRAVERSERS, haltedTraversers);
             final TraverserSet<Object> activeTraversers = new TraverserSet<>();
             IteratorUtils.removeOnNext(haltedTraversers.iterator()).forEachRemaining(traverser -> {
                 traverser.setStepId(this.traversal.get().getStartStep().getId());
@@ -229,19 +230,20 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 graphStep.forEachRemaining(traverser -> {
                     if (traverser.isHalted()) {
                         traverser.detach();
-                        haltedTraversers.add((Traverser.Admin) traverser);
-                        if (!this.keepDistributedHaltedTraversers)
+                        if (this.returnHaltedTraversers)
                             memory.add(HALTED_TRAVERSERS, new TraverserSet<>(traverser));
+                        else
+                            haltedTraversers.add((Traverser.Admin) traverser);
                     } else
                         activeTraversers.add((Traverser.Admin) traverser);
                 });
             }
-            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, !this.keepDistributedHaltedTraversers));
+            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers));
         } else {  // ITERATION 1+
-            memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix, memory, !this.keepDistributedHaltedTraversers));
+            memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers));
         }
-        if (!this.keepDistributedHaltedTraversers)
-            vertex.<TraverserSet>property(HALTED_TRAVERSERS).value().clear();
+        if (this.returnHaltedTraversers || haltedTraversers.isEmpty())
+            vertex.<TraverserSet>property(HALTED_TRAVERSERS).remove();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
index 2ba925b..167924f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -154,9 +154,10 @@ public final class TraverserExecutor {
                         traverser.addLabels(step.getLabels());  // this might need to be generalized for working with global barriers too
                         if (traverser.isHalted()) {
                             traverser.detach();
-                            haltedTraversers.add(traverser);
                             if (returnHaltedTraversers)
                                 memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser.split()));
+                            else
+                                haltedTraversers.add(traverser);
                         } else {
                             traverser.detach();
                             traverserSet.add(traverser);
@@ -175,9 +176,10 @@ public final class TraverserExecutor {
             step.forEachRemaining(traverser -> {
                 if (traverser.isHalted()) {
                     traverser.detach();
-                    haltedTraversers.add(traverser);
                     if (returnHaltedTraversers)
                         memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser.split()));
+                    else
+                        haltedTraversers.add(traverser);
                 } else {
                     activeTraversers.add(traverser);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 520c611..761ae06 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -1973,10 +1973,10 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         assertEquals(2l, traversers.stream().filter(s -> s.get().equals("software")).map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue());
         assertEquals(6, graph3.traversal().V().count().next().intValue());
         assertEquals(6, graph3.traversal().E().count().next().intValue());
-        assertEquals(6, graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue());
+        assertEquals(0, graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue());
         assertEquals(6, graph3.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
         assertEquals(6, graph3.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
-        assertEquals(30, graph3.traversal().V().values().count().next().intValue());
+        assertEquals(24, graph3.traversal().V().values().count().next().intValue()); // no halted traversers
 
         // TODO: add a test the shows DAG behavior -- splitting another TraversalVertexProgram off of the PeerPressureVertexProgram job.
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 4ae5912..cd1440d 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -96,28 +96,27 @@ public final class SparkExecutor {
                         final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
                         final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration() ? new ArrayList<>() : Collections.emptyList();
                         // revive compute properties if they already exist
-                        if (memory.isInitialIteration() && elementComputeKeysArray.length > 0) {
+                        if (memory.isInitialIteration() && elementComputeKeysArray.length > 0)
                             vertex.properties(elementComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
-                        }
                         // drop any computed properties that are cached in memory
-                        if (elementComputeKeysArray.length > 0)
-                            vertex.dropVertexProperties(elementComputeKeysArray);
+                        if (elementComputeKeysArray.length > 0) vertex.dropVertexProperties(elementComputeKeysArray);
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         previousView.forEach(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
-                        // previousView.clear(); // no longer needed so kill it from memory
+                        previousView.clear(); // no longer needed so kill it from memory
                         ///
                         messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
                         workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
-                        // incomingMessages.clear(); // no longer needed so kill it from memory
+                        incomingMessages.clear(); // no longer needed so kill it from memory
                         ///
                         final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
-                                IteratorUtils.list(
-                                        IteratorUtils.map(
-                                                IteratorUtils.filter(
-                                                        vertex.properties(elementComputeKeysArray),
-                                                        VertexProperty::isPresent),
-                                                property -> DetachedFactory.detach(property, true)));
+                                IteratorUtils.list(IteratorUtils.map(
+                                        IteratorUtils.filter(
+                                                vertex.properties(elementComputeKeysArray),
+                                                VertexProperty::isPresent),
+                                        property -> DetachedFactory.detach(property, true)));
+                        // drop any computed properties that are cached in memory
+                        if (elementComputeKeysArray.length > 0) vertex.dropVertexProperties(elementComputeKeysArray);
                         final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
index f532c4e..5e1cc0b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java
@@ -42,7 +42,7 @@ public final class ViewOutgoingPayload<M> implements Payload {
     }
 
     public ViewPayload getView() {
-        return new ViewPayload(null == this.view ? Collections.emptyList() : this.view);
+        return new ViewPayload(this.view);
     }
 
     public List<Tuple2<Object, M>> getOutgoingMessages() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cd5524d7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
index dec0e66..1cdf5f4 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewPayload.java
@@ -38,6 +38,6 @@ public final class ViewPayload implements Payload {
     }
 
     public List<DetachedVertexProperty<Object>> getView() {
-        return this.view;
+        return null == this.view ? Collections.emptyList() : this.view;
     }
 }