You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/05 22:56:10 UTC

incubator-tinkerpop git commit: Made significant memory improvements to TraverserExecutor. Realized some massive heaps on some jobs on Friendster using SparkGraphComputer and tracked it down to how I'm dealing with traversers in TraverserVertexProgram. I

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1131 [created] b54ddf283


Made significant memory improvements to TraverserExecutor. Realized some massive heaps on some jobs on Friendster using SparkGraphComputer and tracked it down to how I'm dealing with traversers in TraverserVertexProgram. I was not 'draining' sets of traversers and thus, was using an excessive amount of memory. This really shows itself when touching edges where you can easily generate million of traversers and to have multiple copies of that data is bad. To make draining work, I had to update all the Iterators to support .remove() which simply call .remove() of the child iterator. Found a simple optimization for CountGlobalStep that will make OLAP counting much faster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b54ddf28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b54ddf28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b54ddf28

Branch: refs/heads/TINKERPOP-1131
Commit: b54ddf2830483705c4c4a865b9f2586ed457223a
Parents: 12b9b21
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 5 14:56:10 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 5 14:56:10 2016 -0700

----------------------------------------------------------------------
 .../computer/traversal/TraverserExecutor.java   | 101 +++++++++++--------
 .../traversal/step/map/CountGlobalStep.java     |  10 ++
 .../gremlin/util/iterator/DoubleIterator.java   |  12 ++-
 .../gremlin/util/iterator/IteratorUtils.java    |  25 +++++
 .../gremlin/util/iterator/MultiIterator.java    |   7 +-
 .../gremlin/util/iterator/SingleIterator.java   |   9 +-
 6 files changed, 118 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b54ddf28/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
index 038e309..2df682c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.process.computer.Messenger;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -31,6 +32,7 @@ import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
 
+import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -42,58 +44,75 @@ public final class TraverserExecutor {
 
         final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
         final AtomicBoolean voteToHalt = new AtomicBoolean(true);
-
         final TraverserSet<Object> aliveTraversers = new TraverserSet<>();
-        // gather incoming traversers into a traverser set and gain the 'weighted-set' optimization
+        final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
+
+        final Iterator<TraverserSet<?>> messages = messenger.receiveMessages();
         final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
-        messenger.receiveMessages().forEachRemaining(traverserSet -> {
-            traverserSet.forEach(traverser -> {
+        while (messages.hasNext()) {
+            final Iterator<Traverser.Admin<Object>> traversers = (Iterator) messages.next().iterator();
+            messages.remove();
+            while (traversers.hasNext()) {
+                final Traverser.Admin<Object> traverser = traversers.next();
+                traversers.remove();
                 traverser.setSideEffects(traversalSideEffects);
                 traverser.attach(Attachable.Method.get(vertex));
-                aliveTraversers.add((Traverser.Admin) traverser);
-            });
-        });
-
+                toProcessTraversers.add((Traverser.Admin) traverser);
+            }
+        }
         // while there are still local traversers, process them until they leave the vertex or halt (i.e. isHalted()).
-        final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
-        while (!aliveTraversers.isEmpty()) {
+        Step<?, ?> previousStep = EmptyStep.instance();
+        while (!toProcessTraversers.isEmpty()) {
+            // process local traversers and if alive, repeat, else halt.
+            Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
+            while (traversers.hasNext()) {
+                final Traverser.Admin<Object> traverser = traversers.next();
+                traversers.remove();
+                final Step<?, ?> currentStep = traversalMatrix.getStepById(traverser.getStepId());
+                if (!currentStep.getId().equals(previousStep.getId()))
+                    TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers);
+                currentStep.addStart((Traverser.Admin) traverser);
+                previousStep = currentStep;
+            }
+            TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers);
+            assert toProcessTraversers.isEmpty();
             // process all the local objects and send messages or store locally again
-            aliveTraversers.forEach(traverser -> {
-                if (traverser.get() instanceof Element || traverser.get() instanceof Property) {      // GRAPH OBJECT
-                    // if the element is remote, then message, else store it locally for re-processing
-                    final Vertex hostingVertex = TraverserExecutor.getHostingVertex(traverser.get());
-                    if (!vertex.equals(hostingVertex)) { // necessary for path access
-                        voteToHalt.set(false);
-                        traverser.detach();
-                        messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser));
-                    } else {
-                        if (traverser.get() instanceof Attachable)   // necessary for path access to local object
-                            traverser.attach(Attachable.Method.get(vertex));
+            if (!aliveTraversers.isEmpty()) {
+                traversers = aliveTraversers.iterator();
+                while (traversers.hasNext()) {
+                    final Traverser.Admin<Object> traverser = traversers.next();
+                    traversers.remove();
+                    if (traverser.get() instanceof Element || traverser.get() instanceof Property) {      // GRAPH OBJECT
+                        // if the element is remote, then message, else store it locally for re-processing
+                        final Vertex hostingVertex = TraverserExecutor.getHostingVertex(traverser.get());
+                        if (!vertex.equals(hostingVertex)) { // necessary for path access
+                            voteToHalt.set(false);
+                            traverser.detach();
+                            messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser));
+                        } else {
+                            if (traverser.get() instanceof Attachable)   // necessary for path access to local object
+                                traverser.attach(Attachable.Method.get(vertex));
+                            toProcessTraversers.add(traverser);
+                        }
+                    } else                                                                              // STANDARD OBJECT
                         toProcessTraversers.add(traverser);
-                    }
-                } else                                                                              // STANDARD OBJECT
-                    toProcessTraversers.add(traverser);
-            });
-
-            // process local traversers and if alive, repeat, else halt.
-            aliveTraversers.clear();
-            toProcessTraversers.forEach(start -> {
-                final Step<?, ?> step = traversalMatrix.getStepById(start.getStepId());
-                step.addStart((Traverser.Admin) start);
-                step.forEachRemaining(end -> {
-                    if (end.asAdmin().isHalted()) {
-                        end.asAdmin().detach();
-                        haltedTraversers.add((Traverser.Admin) end);
-                    } else
-                        aliveTraversers.add((Traverser.Admin) end);
-                });
-            });
-
-            toProcessTraversers.clear();
+                }
+                assert aliveTraversers.isEmpty();
+            }
         }
         return voteToHalt.get();
     }
 
+    private static void drainStep(final Step<?, ?> step, final TraverserSet<?> aliveTraversers, final TraverserSet<?> haltedTraversers) {
+        step.forEachRemaining(traverser -> {
+            if (traverser.asAdmin().isHalted()) {
+                traverser.asAdmin().detach();
+                haltedTraversers.add((Traverser.Admin) traverser);
+            } else
+                aliveTraversers.add((Traverser.Admin) traverser);
+        });
+    }
+
     private static Vertex getHostingVertex(final Object object) {
         Object obj = object;
         while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b54ddf28/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
index 409c445..d090832 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
@@ -61,6 +61,16 @@ public final class CountGlobalStep<S> extends ReducingBarrierStep<S, Long> imple
         return CountGlobalMapReduce.instance();
     }
 
+    @Override
+    public Traverser<Long> processNextStart() {
+        if (this.byPass) {
+            final Traverser.Admin<S> traverser = this.starts.next();
+            return traverser.asAdmin().split(1l, this); // if bypassing, just key all the traversers to 1 long (the count is going to be the bulk of course)
+        } else {
+            return super.processNextStart();
+        }
+    }
+
     ///////////
 
     private static class CountBiFunction<S> implements BiFunction<Long, Traverser<S>, Long>, Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b54ddf28/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/DoubleIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/DoubleIterator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/DoubleIterator.java
index e07bcef..2d7982c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/DoubleIterator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/DoubleIterator.java
@@ -28,8 +28,8 @@ import java.util.Iterator;
  */
 final class DoubleIterator<T> implements Iterator<T>, Serializable {
 
-    private final T a;
-    private final T b;
+    private T a;
+    private T b;
     private char current = 'a';
 
     protected DoubleIterator(final T a, final T b) {
@@ -43,6 +43,14 @@ final class DoubleIterator<T> implements Iterator<T>, Serializable {
     }
 
     @Override
+    public void remove() {
+        if (this.current == 'b')
+            this.a = null;
+        else if (this.current == 'x')
+            this.b = null;
+    }
+
+    @Override
     public T next() {
         if (this.current == 'x')
             throw FastNoSuchElementException.instance();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b54ddf28/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
index dc38a07..808317f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
@@ -105,6 +105,11 @@ public final class IteratorUtils {
             }
 
             @Override
+            public void remove() {
+                iterator.remove();
+            }
+
+            @Override
             public S next() {
                 if (this.count++ >= limit)
                     throw FastNoSuchElementException.instance();
@@ -205,6 +210,11 @@ public final class IteratorUtils {
             }
 
             @Override
+            public void remove() {
+                iterator.remove();
+            }
+
+            @Override
             public S next() {
                 final S s = iterator.next();
                 consumer.accept(s);
@@ -228,6 +238,11 @@ public final class IteratorUtils {
             }
 
             @Override
+            public void remove() {
+                iterator.remove();
+            }
+
+            @Override
             public E next() {
                 return function.apply(iterator.next());
             }
@@ -257,6 +272,11 @@ public final class IteratorUtils {
             }
 
             @Override
+            public void remove() {
+                iterator.remove();
+            }
+
+            @Override
             public S next() {
                 try {
                     if (null != this.nextResult) {
@@ -312,6 +332,11 @@ public final class IteratorUtils {
             }
 
             @Override
+            public void remove() {
+                iterator.remove();
+            }
+
+            @Override
             public E next() {
                 if (this.hasNext())
                     return this.currentIterator.next();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b54ddf28/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
index 02c15f4..e272a08 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
@@ -42,7 +42,7 @@ public final class MultiIterator<T> implements Iterator<T>, Serializable {
         if (this.current >= this.iterators.size())
             return false;
 
-        Iterator<T> currentIterator = iterators.get(this.current);
+        Iterator<T> currentIterator = this.iterators.get(this.current);
 
         while (true) {
             if (currentIterator.hasNext()) {
@@ -58,6 +58,11 @@ public final class MultiIterator<T> implements Iterator<T>, Serializable {
     }
 
     @Override
+    public void remove() {
+        // this.iterators.get(this.current).remove();
+    }
+
+    @Override
     public T next() {
         if (this.iterators.isEmpty()) throw FastNoSuchElementException.instance();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b54ddf28/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java
index b680819..bb2da72 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java
@@ -26,9 +26,9 @@ import java.util.Iterator;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-final class SingleIterator<T> implements Iterator<T>,Serializable {
+final class SingleIterator<T> implements Iterator<T>, Serializable {
 
-    private final T t;
+    private T t;
     private boolean alive = true;
 
     protected SingleIterator(final T t) {
@@ -41,6 +41,11 @@ final class SingleIterator<T> implements Iterator<T>,Serializable {
     }
 
     @Override
+    public void remove() {
+        this.t = null;
+    }
+
+    @Override
     public T next() {
         if (!this.alive)
             throw FastNoSuchElementException.instance();