You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2016/05/31 12:49:22 UTC
[15/49] incubator-tinkerpop git commit: lots of more documentation on
TraversalVertexProgram and I really combed through the code and was able to
find numerous minor optimizations here and there.
lots of more documentation on TraversalVertexProgram and I really combed through the code and was able to find numerous minor optimizations here and there.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e6f2caa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e6f2caa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e6f2caa8
Branch: refs/heads/TINKERPOP-1298
Commit: e6f2caa89adc8e6630dadc613ca9b6b92416c223
Parents: 7255844
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed May 25 14:54:15 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed May 25 14:54:15 2016 -0600
----------------------------------------------------------------------
.../computer/traversal/MasterExecutor.java | 47 +++++++++-----------
.../traversal/TraversalVertexProgram.java | 2 +
.../computer/traversal/WorkerExecutor.java | 32 ++++++-------
.../gremlin/hadoop/structure/HadoopGraph.java | 2 +-
.../HaltedTraverserFactoryStrategyTest.java | 7 ++-
5 files changed, 43 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
index b994f1e..1c1e9d2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
@@ -43,7 +43,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.util.HashSet;
import java.util.Iterator;
@@ -70,20 +69,17 @@ final class MasterExecutor {
return traverser;
}
- // handle traversers and data that were sent from the workers to the master traversal via memory
- protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final TraverserSet<Object> traverserSet, final Set<String> completedBarriers) {
+ protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final TraverserSet<Object> toProcessTraversers, final Set<String> completedBarriers) {
+ // handle traversers and data that were sent from the workers to the master traversal via memory
if (memory.exists(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
for (final String key : memory.<Set<String>>get(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
final Step<Object, Object> step = traversalMatrix.getStepById(key);
- if (null == step) continue; // why? how can this happen?
assert step instanceof Barrier;
completedBarriers.add(step.getId());
if (!(step instanceof LocalBarrier)) { // local barriers don't do any processing on the master traversal (they just lock on the workers)
final Barrier<Object> barrier = (Barrier<Object>) step;
barrier.addBarrier(memory.get(key));
- while (step.hasNext()) {
- traverserSet.add(step.next());
- }
+ step.forEachRemaining(toProcessTraversers::add);
// if it was a reducing barrier step, reset the barrier to its seed value
if (step instanceof ReducingBarrierStep)
memory.set(step.getId(), ((ReducingBarrierStep) step).getSeedSupplier().get());
@@ -100,34 +96,33 @@ final class MasterExecutor {
final TraverserSet<Object> haltedTraversers,
final Class haltedTraverserFactory) {
-
while (!toProcessTraversers.isEmpty()) {
final TraverserSet<Object> localActiveTraversers = new TraverserSet<>();
Step<Object, Object> previousStep = EmptyStep.instance();
Step<Object, Object> currentStep = EmptyStep.instance();
- final Iterator<Traverser.Admin<Object>> traversers = IteratorUtils.removeOnNext(toProcessTraversers.iterator());
+ // these are traversers that are at the master traversal and will either halt here or be distributed back to the workers as needed
+ final Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
while (traversers.hasNext()) {
final Traverser.Admin<Object> traverser = traversers.next();
- traverser.set(DetachedFactory.detach(traverser.get(), true));
+ traversers.remove();
+ traverser.set(DetachedFactory.detach(traverser.get(), true)); // why?
traverser.setSideEffects(traversal.get().getSideEffects());
- if (traverser.isHalted()) {
+ if (traverser.isHalted())
haltedTraversers.add(MasterExecutor.detach(traverser, haltedTraverserFactory));
- } else if (isRemoteTraverser(traverser, traversalMatrix)) { // this is so that patterns like order().name work as expected.
+ else if (isRemoteTraverser(traverser, traversalMatrix)) // this is so that patterns like order().name work as expected. try and stay local as long as possible
remoteActiveTraversers.add(traverser.detach());
- } else {
+ else {
currentStep = traversalMatrix.getStepById(traverser.getStepId());
if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep)) {
while (previousStep.hasNext()) {
final Traverser.Admin<Object> result = previousStep.next();
- if (result.isHalted()) {
+ if (result.isHalted())
haltedTraversers.add(MasterExecutor.detach(result, haltedTraverserFactory));
- } else {
- if (isRemoteTraverser(result, traversalMatrix)) {
- remoteActiveTraversers.add(result.detach());
- } else
- localActiveTraversers.add(result);
- }
+ else if (isRemoteTraverser(result, traversalMatrix))
+ remoteActiveTraversers.add(result.detach());
+ else
+ localActiveTraversers.add(result);
}
}
currentStep.addStart(traverser);
@@ -137,14 +132,12 @@ final class MasterExecutor {
if (!(currentStep instanceof EmptyStep)) {
while (currentStep.hasNext()) {
final Traverser.Admin<Object> traverser = currentStep.next();
- if (traverser.isHalted()) {
+ if (traverser.isHalted())
haltedTraversers.add(MasterExecutor.detach(traverser, haltedTraverserFactory));
- } else {
- if (isRemoteTraverser(traverser, traversalMatrix)) {
- remoteActiveTraversers.add(traverser.detach());
- } else
- localActiveTraversers.add(traverser);
- }
+ else if (isRemoteTraverser(traverser, traversalMatrix))
+ remoteActiveTraversers.add(traverser.detach());
+ else
+ localActiveTraversers.add(traverser);
}
}
assert toProcessTraversers.isEmpty();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index d4daaac..4479306 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -297,7 +297,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
if (voteToHalt) {
// local traverser sets to process
final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
+ // traversers that need to be sent back to the workers (no longer can be processed locally by the master traversal)
final TraverserSet<Object> remoteActiveTraversers = new TraverserSet<>();
+ // halted traversers that have completed their journey
final TraverserSet<Object> haltedTraversers = memory.get(HALTED_TRAVERSERS);
// get all barrier traversers
final Set<String> completedBarriers = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
index f833b6f..5798af0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.process.computer.util.SingleMessenger;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -36,7 +35,6 @@ import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceElement;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -65,19 +63,18 @@ final class WorkerExecutor {
final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
final TraverserSet<Object> activeTraversers = new TraverserSet<>();
final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
- final boolean isTesting = Boolean.valueOf(System.getProperty("is.testing", "false"));
////////////////////////////////
// GENERATE LOCAL TRAVERSERS //
///////////////////////////////
- // some memory systems are interacted by multiple threads and thus, concurrent modification can happen at iterator.remove()
+ // these are traversers that are going from OLTP (master) to OLAP (workers)
+ // these traversers were broadcasted from the master traversal to the workers for attachment
+ final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
+ // some memory systems are interacted with by multiple threads and thus, concurrent modification can happen at iterator.remove().
// its better to reduce the memory footprint and shorten the active traverser list so synchronization is worth it.
- // most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing
- synchronized (memory) {
- // these are traversers that are going from OLTP to OLAP
- // these traversers were broadcasted from the master traversal to the workers for attachment
- final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
+ // most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing.
+ synchronized (maybeActiveTraversers) {
final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
while (iterator.hasNext()) {
final Traverser.Admin<Object> traverser = iterator.next();
@@ -97,20 +94,19 @@ final class WorkerExecutor {
traverser.setSideEffects(traversalSideEffects);
toProcessTraversers.add(traverser);
});
+ assert previousActiveTraversers.isEmpty();
+ // remove the property to save space
vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
});
// these are traversers that have been messaged to the vertex from another vertex
final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
while (messages.hasNext()) {
IteratorUtils.removeOnNext(messages.next().iterator()).forEachRemaining(traverser -> {
- // this is internal testing to ensure that messaged elements are always ReferenceXXX and not DetachedXXX (related to HaltedTraverserFactoryStrategy)
- if (isTesting && !(messenger instanceof SingleMessenger) && traverser.get() instanceof Element)
- assert traverser.get() instanceof ReferenceElement;
if (traverser.isHalted()) {
if (returnHaltedTraversers)
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
else
- haltedTraversers.add(traverser);
+ haltedTraversers.add(traverser.detach());
} else {
// traverser is not halted and thus, should be processed locally
traverser.attach(Attachable.Method.get(vertex));
@@ -132,6 +128,7 @@ final class WorkerExecutor {
final Traverser.Admin<Object> traverser = traversers.next();
traversers.remove();
final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
+ // try and fill up the current step as much as possible with traversers to get a bulking optimization
if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep))
WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserFactory);
currentStep.addStart(traverser);
@@ -150,12 +147,11 @@ final class WorkerExecutor {
if (traverser.get() instanceof Element || traverser.get() instanceof Property) { // GRAPH OBJECT
// if the element is remote, then message, else store it locally for re-processing
final Vertex hostingVertex = WorkerExecutor.getHostingVertex(traverser.get());
- if (!vertex.equals(hostingVertex)) { // necessary for path access
- voteToHalt.set(false);
+ if (!vertex.equals(hostingVertex)) { // if its host is not the current vertex, then send the traverser to the hosting vertex
+ voteToHalt.set(false); // if message is passed, then don't vote to halt
messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser.detach()));
} else {
- if (traverser.get() instanceof Attachable) // necessary for path access to local object
- traverser.attach(Attachable.Method.get(vertex));
+ traverser.attach(Attachable.Method.get(vertex));
toProcessTraversers.add(traverser);
}
} else // STANDARD OBJECT
@@ -195,7 +191,7 @@ final class WorkerExecutor {
else
haltedTraversers.add(traverser.detach());
} else
- localBarrierTraversers.add(traverser);
+ localBarrierTraversers.add(traverser.detach());
});
}
memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index d643cd4..d0f50d0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -109,7 +109,7 @@ import java.util.stream.Stream;
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals",
method = "g_V_matchXa_0sungBy_b__a_0sungBy_c__b_writtenBy_d__c_writtenBy_e__d_hasXname_George_HarisonX__e_hasXname_Bob_MarleyXX",
reason = "Hadoop-Gremlin is OLAP-oriented and for OLTP operations, linear-scan joins are required. This particular tests takes many minutes to execute.",
- computers = {"ALL"})
+ computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"}) // this is a nasty long test, just do it once in Java MatchTest
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals",
method = "g_V_matchXa_0sungBy_b__a_0writtenBy_c__b_writtenBy_d__c_sungBy_d__d_hasXname_GarciaXX",
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e6f2caa8/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
index af0b3b7..43bc94e 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/traversal/strategy/decoration/HaltedTraverserFactoryStrategyTest.java
@@ -23,10 +23,12 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorati
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPath;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdge;
+import org.apache.tinkerpop.gremlin.structure.util.reference.ReferencePath;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceProperty;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertex;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexProperty;
@@ -64,6 +66,7 @@ public class HaltedTraverserFactoryStrategyTest {
g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(DetachedProperty.class, property.getClass()));
g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass()));
g.V().out().out().forEachRemaining(vertex -> assertEquals(DetachedVertex.class, vertex.getClass()));
+ g.V().out().out().path().forEachRemaining(path -> assertEquals(DetachedPath.class, path.getClass()));
}
@Test
@@ -77,7 +80,8 @@ public class HaltedTraverserFactoryStrategyTest {
g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(ReferenceProperty.class, property.getClass()));
g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass()));
g.V().out().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass()));
- //
+ g.V().out().out().path().forEachRemaining(path -> assertEquals(ReferencePath.class, path.getClass()));
+ // the default should be reference elements
g = graph.traversal().withComputer();
g.V().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass()));
g.V().out().properties("name").forEachRemaining(vertexProperty -> assertEquals(ReferenceVertexProperty.class, vertexProperty.getClass()));
@@ -86,6 +90,7 @@ public class HaltedTraverserFactoryStrategyTest {
g.V().out().outE().properties("weight").forEachRemaining(property -> assertEquals(ReferenceProperty.class, property.getClass()));
g.V().out().outE().values("weight").forEachRemaining(value -> assertEquals(Double.class, value.getClass()));
g.V().out().out().forEachRemaining(vertex -> assertEquals(ReferenceVertex.class, vertex.getClass()));
+ g.V().out().out().path().forEachRemaining(path -> assertEquals(ReferencePath.class, path.getClass()));
}
}