You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2018/08/10 00:02:40 UTC
[10/14] tinkerpop git commit: TINKERPOP-1967 fixed up halted
traversers
TINKERPOP-1967 fixed up halted traversers
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b2cb1874
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b2cb1874
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b2cb1874
Branch: refs/heads/TINKERPOP-1990
Commit: b2cb187470794bfca352c8a9c7d0c444d102e46b
Parents: 8954c27
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Mon Jul 30 10:51:35 2018 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Aug 9 10:54:41 2018 -0400
----------------------------------------------------------------------
.../ConnectedComponentVertexProgram.java | 42 +++++++++++++++-----
.../ConnectedComponentVertexProgramStep.java | 26 ++++++++++--
.../step/map/ConnectedComponentTest.java | 2 +-
3 files changed, 57 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2cb1874/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java
index de718f1..82907eb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java
@@ -32,7 +32,10 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexPr
import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.IndexedTraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -40,6 +43,8 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -61,12 +66,14 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
private static final String VOTE_TO_HALT = "gremlin.connectedComponentVertexProgram.voteToHalt";
private static final Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = Collections.singleton(MemoryComputeKey.of(VOTE_TO_HALT, Operator.and, false, true));
+
private MessageScope.Local<?> scope = MessageScope.Local.of(__::bothE);
private Set<MessageScope> scopes;
private String property = COMPONENT;
- private boolean hasHalted = false;
private PureTraversal<Vertex, Edge> edgeTraversal = null;
private Configuration configuration;
+ private TraverserSet<Vertex> haltedTraversers;
+ private IndexedTraverserSet<Vertex, Vertex> haltedTraversersIndex;
private ConnectedComponentVertexProgram() {}
@@ -85,7 +92,12 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
scopes = new HashSet<>(Collections.singletonList(scope));
this.property = configuration.getString(PROPERTY, COMPONENT);
- this.hasHalted = configuration.getBoolean(HAS_HALTED, false);
+
+ this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration);
+ this.haltedTraversersIndex = new IndexedTraverserSet<>(v -> v);
+ for (final Traverser.Admin<Vertex> traverser : this.haltedTraversers) {
+ this.haltedTraversersIndex.add(traverser.split());
+ }
}
@Override
@@ -104,6 +116,8 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
@Override
public void execute(final Vertex vertex, final Messenger<String> messenger, final Memory memory) {
if (memory.isInitialIteration()) {
+ copyHaltedTraversersFromMemory(vertex);
+
// on the first pass, just initialize the component to its own id then pass it to all adjacent vertices
// for evaluation
vertex.property(VertexProperty.Cardinality.single, property, vertex.id().toString());
@@ -113,7 +127,7 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
// halting traversers. only want to send messages from traversers that are still hanging about after
// the filter. the unfiltered vertices can only react to messages sent to them. of course, this can
// lead to weirdness in results.
- if (vertex.edges(Direction.BOTH).hasNext() && !(hasHalted && !vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent())) {
+ if (vertex.edges(Direction.BOTH).hasNext()) {
// since there was message passing we don't want to halt on the first round. this should only trigger
// a single pass finish if the graph is completely disconnected (technically, it won't even really
// work in cases where halted traversers come into play
@@ -148,7 +162,9 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
- return new HashSet<>(Collections.singletonList(VertexComputeKey.of(property, false)));
+ return new HashSet<>(Arrays.asList(
+ VertexComputeKey.of(property, false),
+ VertexComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, false)));
}
@Override
@@ -158,6 +174,10 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
@Override
public boolean terminate(final Memory memory) {
+ if (memory.isInitialIteration() && this.haltedTraversersIndex != null) {
+ this.haltedTraversersIndex.clear();
+ }
+
final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
if (voteToHalt) {
return true;
@@ -206,6 +226,15 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
};
}
+ private void copyHaltedTraversersFromMemory(final Vertex vertex) {
+ final Collection<Traverser.Admin<Vertex>> traversers = this.haltedTraversersIndex.get(vertex);
+ if (traversers != null) {
+ final TraverserSet<Vertex> newHaltedTraversers = new TraverserSet<>();
+ newHaltedTraversers.addAll(traversers);
+ vertex.property(VertexProperty.Cardinality.single, TraversalVertexProgram.HALTED_TRAVERSERS, newHaltedTraversers);
+ }
+ }
+
public static ConnectedComponentVertexProgram.Builder build() {
return new ConnectedComponentVertexProgram.Builder();
}
@@ -216,11 +245,6 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
super(ConnectedComponentVertexProgram.class);
}
- public ConnectedComponentVertexProgram.Builder hasHalted(final boolean hasHalted) {
- this.configuration.setProperty(HAS_HALTED, hasHalted);
- return this;
- }
-
public ConnectedComponentVertexProgram.Builder edges(final Traversal.Admin<Vertex, Edge> edgeTraversal) {
PureTraversal.storeState(this.configuration, EDGE_TRAVERSAL, edgeTraversal);
return this;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2cb1874/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java
index edeb497..c222cfa 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java
@@ -29,11 +29,16 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.step.Configuring;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.Serializer;
+
+import java.io.IOException;
+import java.util.Base64;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
@@ -84,10 +89,25 @@ public final class ConnectedComponentVertexProgramStep extends VertexProgramStep
public ConnectedComponentVertexProgram generateProgram(final Graph graph, final Memory memory) {
final Traversal.Admin<Vertex, Edge> detachedTraversal = this.edgeTraversal.getPure();
detachedTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass()));
- return ConnectedComponentVertexProgram.build().
- hasHalted(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS)).
+
+ final ConnectedComponentVertexProgram.Builder builder = ConnectedComponentVertexProgram.build().
edges(detachedTraversal).
- property(this.clusterProperty).create(graph);
+ property(this.clusterProperty);
+
+ if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) {
+ final TraverserSet<?> haltedTraversers = memory.get(TraversalVertexProgram.HALTED_TRAVERSERS);
+ if (!haltedTraversers.isEmpty()) {
+ Object haltedTraversersValue;
+ try {
+ haltedTraversersValue = Base64.getEncoder().encodeToString(Serializer.serializeObject(haltedTraversers));
+ } catch (final IOException ignored) {
+ haltedTraversersValue = haltedTraversers;
+ }
+ builder.configure(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversersValue);
+ }
+ }
+
+ return builder.create(graph);
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2cb1874/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java
index 25e618a..8b1904f 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java
@@ -67,7 +67,7 @@ public abstract class ConnectedComponentTest extends AbstractGremlinProcessTest
switch (name) {
case "lop":
case "ripple":
- assertEquals("3", vertex.value(ConnectedComponentVertexProgram.COMPONENT));
+ assertEquals("1", vertex.value(ConnectedComponentVertexProgram.COMPONENT));
break;
}
counter++;