You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2018/07/30 16:40:49 UTC
[2/2] tinkerpop git commit: added comments in SPVP
added comments in SPVP
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/e886acd9
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/e886acd9
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/e886acd9
Branch: refs/heads/TINKERPOP-1990
Commit: e886acd92945ec0c188c181ae69adbb87cbf461a
Parents: 2ce11bf
Author: Daniel Kuppitz <da...@hotmail.com>
Authored: Mon Jul 30 09:40:43 2018 -0700
Committer: Daniel Kuppitz <da...@hotmail.com>
Committed: Mon Jul 30 09:40:43 2018 -0700
----------------------------------------------------------------------
.../search/path/ShortestPathVertexProgram.java | 32 +++++++++++++++++++-
.../step/map/ShortestPathVertexProgramStep.java | 3 ++
2 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e886acd9/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java
index 549dff9..1949c53 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/search/path/ShortestPathVertexProgram.java
@@ -152,6 +152,7 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
}
}
+ // restore halted traversers from the configuration and build an index for direct access
this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration);
this.haltedTraversersIndex = new IndexedTraverserSet<>(v -> v);
for (final Traverser.Admin<Vertex> traverser : this.haltedTraversers) {
@@ -254,10 +255,15 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
if (memory.isInitialIteration()) {
+ // Use the first iteration to copy halted traversers from the halted traverser index to the respective
+ // vertices. This way the rest of the code can be simplified and always expect the HALTED_TRAVERSERS
+ // property to be available (if halted traversers exist for this vertex).
copyHaltedTraversersFromMemory(vertex);
+ // ignore vertices that don't pass the start-vertex filter
if (!isStartVertex(vertex)) return;
+ // start to track paths for all valid start-vertices
final Map<Vertex, Pair<Number, Set<Path>>> paths = new HashMap<>();
final Path path;
final Set<Path> pathSet = new HashSet<>();
@@ -267,12 +273,14 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
vertex.property(VertexProperty.Cardinality.single, PATHS, paths);
+ // send messages to valid adjacent vertices
processEdges(vertex, path, 0, messenger);
voteToHalt = false;
} else {
+ // load existing paths to this vertex and extend them based on messages received from adjacent vertices
final Map<Vertex, Pair<Number, Set<Path>>> paths =
vertex.<Map<Vertex, Pair<Number, Set<Path>>>>property(PATHS).orElseGet(HashMap::new);
final Iterator<Triplet<Path, Edge, Number>> iterator = messenger.receiveMessages();
@@ -286,6 +294,7 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
Path newPath = null;
+ // already know a path coming from this source vertex?
if (paths.containsKey(sourceVertex)) {
final Number currentShortestDistance = paths.get(sourceVertex).getValue0();
@@ -294,19 +303,26 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
if (cmp <= 0) {
newPath = extendPath(sourcePath, triplet.getValue1(), vertex);
if (cmp < 0) {
+ // if the path length is smaller than the current shortest path's length, replace the
+ // current set of shortest paths
final Set<Path> pathSet = new HashSet<>();
pathSet.add(newPath);
paths.put(sourceVertex, Pair.with(distance, pathSet));
} else {
+ // if the path length is equal to the current shortest path's length, add the new path
+ // to the set of shortest paths
paths.get(sourceVertex).getValue1().add(newPath);
}
}
} else if (!exceedsMaxDistance(distance)) {
+ // store the new path as the shortest path from the source vertex to the current vertex
final Set<Path> pathSet = new HashSet<>();
pathSet.add(newPath = extendPath(sourcePath, triplet.getValue1(), vertex));
paths.put(sourceVertex, Pair.with(distance, pathSet));
}
+ // if a new path was found, send messages to adjacent vertices, otherwise do nothing as there's no
+ // chance to find any new paths going forward
if (newPath != null) {
vertex.property(VertexProperty.Cardinality.single, PATHS, paths);
processEdges(vertex, newPath, distance, messenger);
@@ -315,6 +331,7 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
}
}
+ // VOTE_TO_HALT will be set to true if an iteration hasn't found any new paths
memory.add(VOTE_TO_HALT, voteToHalt);
}
@@ -327,12 +344,15 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
if (voteToHalt) {
final int state = memory.get(STATE);
if (state == COLLECT_PATHS) {
+ // After paths were collected,
+ // a) the VP is done in standalone mode (paths will be in memory) or
+ // b) the halted traversers will be updated in order to have the paths available in the traversal
if (this.standalone) return true;
memory.set(STATE, UPDATE_HALTED_TRAVERSERS);
return false;
}
if (state == UPDATE_HALTED_TRAVERSERS) return true;
- else memory.set(STATE, COLLECT_PATHS);
+ else memory.set(STATE, COLLECT_PATHS); // collect paths if no new paths were found
return false;
} else {
memory.set(VOTE_TO_HALT, true);
@@ -399,11 +419,13 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
}
private boolean isStartVertex(final Vertex vertex) {
+ // use the sourceVertexFilterTraversal if the VP is running in standalone mode (not part of a traversal)
if (this.standalone) {
final Traversal.Admin<Vertex, ?> filterTraversal = this.sourceVertexFilterTraversal.getPure();
filterTraversal.addStart(filterTraversal.getTraverserGenerator().generate(vertex, filterTraversal.getStartStep(), 1));
return filterTraversal.hasNext();
}
+ // ...otherwise use halted traversers to determine whether this is a start vertex
return vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent();
}
@@ -429,6 +451,7 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
if (otherV.equals(vertex))
otherV = edge.outVertex();
+ // only send message if the adjacent vertex is not yet part of the current path
if (!currentPath.objects().contains(otherV)) {
messenger.sendMessage(MessageScope.Global.of(otherV),
Triplet.with(currentPath, this.includeEdges ? edge : null,
@@ -441,6 +464,8 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
if (isStartVertex(vertex)) {
final List<Path> paths = memory.get(SHORTEST_PATHS);
if (vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent()) {
+ // replace the current set of halted traversers with new new traversers that hold the shortest paths
+ // found for this vertex
final TraverserSet<Vertex> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
final TraverserSet<Path> newHaltedTraversers = new TraverserSet<>();
for (final Traverser.Admin<Vertex> traverser : haltedTraversers) {
@@ -471,6 +496,11 @@ public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Ed
&& NumberHelper.compare(distance, this.maxDistance) > 0;
}
+ /**
+ * Move any valid path into the VP's memory.
+ * @param vertex The current vertex.
+ * @param memory The VertexProgram's memory.
+ */
private void collectShortestPaths(final Vertex vertex, final Memory memory) {
final VertexProperty<Map<Vertex, Pair<Number, Set<Path>>>> pathProperty = vertex.property(PATHS);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e886acd9/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ShortestPathVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ShortestPathVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ShortestPathVertexProgramStep.java
index 5a0f338..e9a09e2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ShortestPathVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ShortestPathVertexProgramStep.java
@@ -136,6 +136,9 @@ public final class ShortestPathVertexProgramStep extends VertexProgramStep imple
ProgramVertexProgramStep.ROOT_TRAVERSAL, rootTraversalValue,
ProgramVertexProgramStep.STEP_ID, this.id);
+ // There are two locations in which halted traversers can be stored: in memory or as vertex properties. In the
+ // former case they need to be copied to this VertexProgram's configuration as the VP won't have access to the
+ // previous VP's memory.
if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) {
final TraverserSet<?> haltedTraversers = memory.get(TraversalVertexProgram.HALTED_TRAVERSERS);
if (!haltedTraversers.isEmpty()) {