You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/27 15:55:18 UTC

[03/10] incubator-tinkerpop git commit: lots of documentation on TraversalVertexProgram and got HaltedTraveserFactoryStrategy tested and optimized.

lots of documentation on TraversalVertexProgram and got HaltedTraveserFactoryStrategy tested and optimized.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3978e7bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3978e7bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3978e7bd

Branch: refs/heads/master
Commit: 3978e7bda5f4896ea1c2815c889e206afcbaccaa
Parents: 19f16f1
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 25 12:25:31 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 25 12:25:31 2016 -0600

----------------------------------------------------------------------
 .../computer/traversal/MasterExecutor.java      |  6 +-
 .../computer/traversal/WorkerExecutor.java      | 56 +++++++-----
 .../HaltedTraverserFactoryStrategy.java         | 13 ++-
 .../HaltedTraverserFactoryStrategyTest.java     | 91 ++++++++++++++++++++
 4 files changed, 140 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3978e7bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
index b5ec12b..b994f1e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
@@ -58,6 +58,8 @@ final class MasterExecutor {
 
     }
 
+    // the halted traversers can either be reference or detached elements -- this is good for determining how much data around the element the user wants to get
+    // see HaltedTraverserFactoryStrategy for how this is all connected
     protected static <R> Traverser.Admin<R> detach(final Traverser.Admin<R> traverser, final Class haltedTraverserFactory) {
         if (haltedTraverserFactory.equals(DetachedFactory.class))
             traverser.set(DetachedFactory.detach(traverser.get(), true));
@@ -68,11 +70,12 @@ final class MasterExecutor {
         return traverser;
     }
 
+    // handle traversers and data that were sent from the workers to the master traversal via memory
     protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final TraverserSet<Object> traverserSet, final Set<String> completedBarriers) {
         if (memory.exists(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
             for (final String key : memory.<Set<String>>get(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
                 final Step<Object, Object> step = traversalMatrix.getStepById(key);
-                if (null == step) continue;
+                if (null == step) continue; // why? how can this happen?
                 assert step instanceof Barrier;
                 completedBarriers.add(step.getId());
                 if (!(step instanceof LocalBarrier)) {  // local barriers don't do any processing on the master traversal (they just lock on the workers)
@@ -81,6 +84,7 @@ final class MasterExecutor {
                     while (step.hasNext()) {
                         traverserSet.add(step.next());
                     }
+                    // if it was a reducing barrier step, reset the barrier to its seed value
                     if (step instanceof ReducingBarrierStep)
                         memory.set(step.getId(), ((ReducingBarrierStep) step).getSeedSupplier().get());
                 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3978e7bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
index 5bc3da9..f833b6f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.util.SingleMessenger;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -35,6 +36,7 @@ import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceElement;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
@@ -58,30 +60,37 @@ final class WorkerExecutor {
                                      final Memory memory,
                                      final boolean returnHaltedTraversers,
                                      final Class haltedTraverserFactory) {
-
         final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
         final AtomicBoolean voteToHalt = new AtomicBoolean(true);
         final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
         final TraverserSet<Object> activeTraversers = new TraverserSet<>();
         final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
+        final boolean isTesting = Boolean.valueOf(System.getProperty("is.testing", "false"));
 
         ////////////////////////////////
         // GENERATE LOCAL TRAVERSERS //
         ///////////////////////////////
 
-        // these are traversers that are going from OLTP to OLAP
-        final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
-        final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
-        while (iterator.hasNext()) {
-            final Traverser.Admin<Object> traverser = iterator.next();
-            if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) {
-                // iterator.remove(); ConcurrentModificationException
-                traverser.attach(Attachable.Method.get(vertex));
-                traverser.setSideEffects(traversalSideEffects);
-                toProcessTraversers.add(traverser);
+        // some memory systems are interacted by multiple threads and thus, concurrent modification can happen at iterator.remove()
+        // its better to reduce the memory footprint and shorten the active traverser list so synchronization is worth it.
+        // most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing
+        synchronized (memory) {
+            // these are traversers that are going from OLTP to OLAP
+            // these traversers were broadcasted from the master traversal to the workers for attachment
+            final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
+            final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
+            while (iterator.hasNext()) {
+                final Traverser.Admin<Object> traverser = iterator.next();
+                if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) {
+                    iterator.remove();
+                    traverser.attach(Attachable.Method.get(vertex));
+                    traverser.setSideEffects(traversalSideEffects);
+                    toProcessTraversers.add(traverser);
+                }
             }
         }
         // these are traversers that exist from from a local barrier
+        // these traversers will simply saved at the local vertex while the master traversal synchronized the barrier
         vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> {
             IteratorUtils.removeOnNext(previousActiveTraversers.iterator()).forEachRemaining(traverser -> {
                 traverser.attach(Attachable.Method.get(vertex));
@@ -90,24 +99,25 @@ final class WorkerExecutor {
             });
             vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
         });
-        // these are traversers that have been messaged to the vertex
+        // these are traversers that have been messaged to the vertex from another vertex
         final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
         while (messages.hasNext()) {
-            final Iterator<Traverser.Admin<Object>> traversers = messages.next().iterator();
-            while (traversers.hasNext()) {
-                final Traverser.Admin<Object> traverser = traversers.next();
-                traversers.remove();
+            IteratorUtils.removeOnNext(messages.next().iterator()).forEachRemaining(traverser -> {
+                // this is internal testing to ensure that messaged elements are always ReferenceXXX and not DetachedXXX (related to HaltedTraverserFactoryStrategy)
+                if (isTesting && !(messenger instanceof SingleMessenger) && traverser.get() instanceof Element)
+                    assert traverser.get() instanceof ReferenceElement;
                 if (traverser.isHalted()) {
                     if (returnHaltedTraversers)
                         memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
                     else
                         haltedTraversers.add(traverser);
                 } else {
+                    // traverser is not halted and thus, should be processed locally
                     traverser.attach(Attachable.Method.get(vertex));
                     traverser.setSideEffects(traversalSideEffects);
                     toProcessTraversers.add(traverser);
                 }
-            }
+            });
         }
 
         ///////////////////////////////
@@ -116,7 +126,6 @@ final class WorkerExecutor {
 
         // while there are still local traversers, process them until they leave the vertex or halt (i.e. isHalted()).
         while (!toProcessTraversers.isEmpty()) {
-            // process local traversers and if alive, repeat, else halt.
             Step<Object, Object> previousStep = EmptyStep.instance();
             Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
             while (traversers.hasNext()) {
@@ -129,6 +138,7 @@ final class WorkerExecutor {
                 previousStep = currentStep;
             }
             WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserFactory);
+            // all processed traversers should be either halted or active
             assert toProcessTraversers.isEmpty();
             // process all the local objects and send messages or store locally again
             if (!activeTraversers.isEmpty()) {
@@ -136,6 +146,7 @@ final class WorkerExecutor {
                 while (traversers.hasNext()) {
                     final Traverser.Admin<Object> traverser = traversers.next();
                     traversers.remove();
+                    // decide whether to message the traverser or to process it locally
                     if (traverser.get() instanceof Element || traverser.get() instanceof Property) {      // GRAPH OBJECT
                         // if the element is remote, then message, else store it locally for re-processing
                         final Vertex hostingVertex = WorkerExecutor.getHostingVertex(traverser.get());
@@ -167,9 +178,10 @@ final class WorkerExecutor {
             if (step instanceof Bypassing)
                 ((Bypassing) step).setBypass(true);
             if (step instanceof LocalBarrier) {
+                // local barrier traversers are stored on the vertex until the master traversal synchronizes the system
                 final LocalBarrier<Object> barrier = (LocalBarrier<Object>) step;
-                final TraverserSet<Object> traverserSet = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>());
-                vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, traverserSet);
+                final TraverserSet<Object> localBarrierTraversers = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>());
+                vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, localBarrierTraversers);
                 while (barrier.hasNextBarrier()) {
                     final TraverserSet<Object> barrierSet = barrier.nextBarrier();
                     IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> {
@@ -183,7 +195,7 @@ final class WorkerExecutor {
                             else
                                 haltedTraversers.add(traverser.detach());
                         } else
-                            traverserSet.add(traverser.detach());
+                            localBarrierTraversers.add(traverser);
                     });
                 }
                 memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
@@ -198,7 +210,7 @@ final class WorkerExecutor {
             step.forEachRemaining(traverser -> {
                 if (traverser.isHalted() &&
                         // if its a ReferenceFactory (one less iteration)
-                        ((returnHaltedTraversers || haltedTraverserFactory == ReferenceFactory.class) &&
+                        ((returnHaltedTraversers || ReferenceFactory.class == haltedTraverserFactory) &&
                                 (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
                                 getHostingVertex(traverser.get()).equals(vertex))) {
                     if (returnHaltedTraversers)

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3978e7bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java
index b046986..c2f3855 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategy.java
@@ -22,12 +22,14 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorat
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -42,11 +44,16 @@ public final class HaltedTraverserFactoryStrategy extends AbstractTraversalStrat
     }
 
     public void apply(final Traversal.Admin<?, ?> traversal) {
-        TraversalHelper.getStepsOfAssignableClass(TraversalVertexProgramStep.class, traversal)
-                .forEach(step -> step.setHaltedTraverserFactory(this.haltedTraverserFactory));
+        // only the root traversal should be processed
+        if (traversal.getParent() instanceof EmptyStep) {
+            final List<TraversalVertexProgramStep> steps = TraversalHelper.getStepsOfAssignableClass(TraversalVertexProgramStep.class, traversal);
+            // only the last step (the one returning data) needs to have a non-reference traverser factory
+            if (!steps.isEmpty())
+                steps.get(steps.size() - 1).setHaltedTraverserFactory(this.haltedTraverserFactory);
+        }
     }
 
-    public static HaltedTraverserFactoryStrategy detach() {
+    public static HaltedTraverserFactoryStrategy detached() {
         return new HaltedTraverserFactoryStrategy(DetachedFactory.class);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3978e7bd/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
new file mode 100644
index 0000000..af0b3b7
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.computer.traversal.strategy.decoration;
+
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.HaltedTraverserFactoryStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdge;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceProperty;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertex;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexProperty;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class HaltedTraverserFactoryStrategyTest {
+
+    @Before
+    public void setup() {
+        // necessary as ComputerResult step for testing purposes attaches Attachables
+        System.setProperty("is.testing", "false");
+    }
+
+    @After
+    public void shutdown() {
+        System.setProperty("is.testing", "true");
+    }
+
+    @Test
+    public void shouldReturnDetachedElements() {
+        Graph graph = TinkerFactory.createModern();
+        GraphTraversalSource g = graph.traversal().withComputer().withStrategies(HaltedTraverserFactoryStrategy.detached());
+        g.V().out().forEachRemaining(vertex -> assertEquals(DetachedVertex.class, vertex.getClass()));
+        g.V().out().properties("name").forEachRemaining(vertexProperty -> assertEquals(DetachedVertexProperty.class, vertexProperty.getClass()));
+        g.V().out().values("name").forEachRemaining(value -> assertEquals(String.class, value.getClass()));
+        g.V().out().outE().forEachRemaining(edge -> assertEquals(DetachedEdge.class, edge.getClass()));
+        g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(DetachedProperty.class, property.getClass()));
+        g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass()));
+        g.V().out().out().forEachRemaining(vertex -> assertEquals(DetachedVertex.class, vertex.getClass()));
+    }
+
+    @Test
+    public void shouldReturnReferenceElements() {
+        Graph graph = TinkerFactory.createModern();
+        GraphTraversalSource g = graph.traversal().withComputer().withStrategies(HaltedTraverserFactoryStrategy.reference());
+        g.V().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass()));
+        g.V().out().properties("name").forEachRemaining(vertexProperty -> assertEquals(ReferenceVertexProperty.class, vertexProperty.getClass()));
+        g.V().out().values("name").forEachRemaining(value -> assertEquals(String.class, value.getClass()));
+        g.V().out().outE().forEachRemaining(edge -> assertEquals(ReferenceEdge.class, edge.getClass()));
+        g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(ReferenceProperty.class, property.getClass()));
+        g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass()));
+        g.V().out().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass()));
+        //
+        g = graph.traversal().withComputer();
+        g.V().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass()));
+        g.V().out().properties("name").forEachRemaining(vertexProperty -> assertEquals(ReferenceVertexProperty.class, vertexProperty.getClass()));
+        g.V().out().values("name").forEachRemaining(value -> assertEquals(String.class, value.getClass()));
+        g.V().out().outE().forEachRemaining(edge -> assertEquals(ReferenceEdge.class, edge.getClass()));
+        g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(ReferenceProperty.class, property.getClass()));
+        g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass()));
+        g.V().out().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass()));
+    }
+
+}