You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2018/08/09 18:12:45 UTC

[03/12] tinkerpop git commit: TINKERPOP-1967 fixed up halted traversers

TINKERPOP-1967 fixed up halted traversers


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b2cb1874
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b2cb1874
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b2cb1874

Branch: refs/heads/master
Commit: b2cb187470794bfca352c8a9c7d0c444d102e46b
Parents: 8954c27
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Mon Jul 30 10:51:35 2018 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Aug 9 10:54:41 2018 -0400

----------------------------------------------------------------------
 .../ConnectedComponentVertexProgram.java        | 42 +++++++++++++++-----
 .../ConnectedComponentVertexProgramStep.java    | 26 ++++++++++--
 .../step/map/ConnectedComponentTest.java        |  2 +-
 3 files changed, 57 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2cb1874/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java
index de718f1..82907eb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/connected/ConnectedComponentVertexProgram.java
@@ -32,7 +32,10 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexPr
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
 import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.IndexedTraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -40,6 +43,8 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -61,12 +66,14 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
     private static final String VOTE_TO_HALT = "gremlin.connectedComponentVertexProgram.voteToHalt";
 
     private static final Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = Collections.singleton(MemoryComputeKey.of(VOTE_TO_HALT, Operator.and, false, true));
+
     private MessageScope.Local<?> scope = MessageScope.Local.of(__::bothE);
     private Set<MessageScope> scopes;
     private String property = COMPONENT;
-    private boolean hasHalted = false;
     private PureTraversal<Vertex, Edge> edgeTraversal = null;
     private Configuration configuration;
+    private TraverserSet<Vertex> haltedTraversers;
+    private IndexedTraverserSet<Vertex, Vertex> haltedTraversersIndex;
 
     private ConnectedComponentVertexProgram() {}
 
@@ -85,7 +92,12 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
         scopes = new HashSet<>(Collections.singletonList(scope));
 
         this.property = configuration.getString(PROPERTY, COMPONENT);
-        this.hasHalted = configuration.getBoolean(HAS_HALTED, false);
+
+        this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration);
+        this.haltedTraversersIndex = new IndexedTraverserSet<>(v -> v);
+        for (final Traverser.Admin<Vertex> traverser : this.haltedTraversers) {
+            this.haltedTraversersIndex.add(traverser.split());
+        }
     }
 
     @Override
@@ -104,6 +116,8 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
     @Override
     public void execute(final Vertex vertex, final Messenger<String> messenger, final Memory memory) {
         if (memory.isInitialIteration()) {
+            copyHaltedTraversersFromMemory(vertex);
+
             // on the first pass, just initialize the component to its own id then pass it to all adjacent vertices
             // for evaluation
             vertex.property(VertexProperty.Cardinality.single, property, vertex.id().toString());
@@ -113,7 +127,7 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
             // halting traversers. only want to send messages from traversers that are still hanging about after
             // the filter. the unfiltered vertices can only react to messages sent to them. of course, this can
             // lead to weirdness in results. 
-            if (vertex.edges(Direction.BOTH).hasNext() && !(hasHalted && !vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent())) {
+            if (vertex.edges(Direction.BOTH).hasNext()) {
                 // since there was message passing we don't want to halt on the first round. this should only trigger
                 // a single pass finish if the graph is completely disconnected (technically, it won't even really
                 // work in cases where halted traversers come into play
@@ -148,7 +162,9 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
 
     @Override
     public Set<VertexComputeKey> getVertexComputeKeys() {
-        return new HashSet<>(Collections.singletonList(VertexComputeKey.of(property, false)));
+        return new HashSet<>(Arrays.asList(
+                VertexComputeKey.of(property, false),
+                VertexComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, false)));
     }
 
     @Override
@@ -158,6 +174,10 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
 
     @Override
     public boolean terminate(final Memory memory) {
+        if (memory.isInitialIteration() && this.haltedTraversersIndex != null) {
+            this.haltedTraversersIndex.clear();
+        }
+
         final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
         if (voteToHalt) {
             return true;
@@ -206,6 +226,15 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
         };
     }
 
+    private void copyHaltedTraversersFromMemory(final Vertex vertex) {
+        final Collection<Traverser.Admin<Vertex>> traversers = this.haltedTraversersIndex.get(vertex);
+        if (traversers != null) {
+            final TraverserSet<Vertex> newHaltedTraversers = new TraverserSet<>();
+            newHaltedTraversers.addAll(traversers);
+            vertex.property(VertexProperty.Cardinality.single, TraversalVertexProgram.HALTED_TRAVERSERS, newHaltedTraversers);
+        }
+    }
+
     public static ConnectedComponentVertexProgram.Builder build() {
         return new ConnectedComponentVertexProgram.Builder();
     }
@@ -216,11 +245,6 @@ public class ConnectedComponentVertexProgram implements VertexProgram<String> {
             super(ConnectedComponentVertexProgram.class);
         }
 
-        public ConnectedComponentVertexProgram.Builder hasHalted(final boolean hasHalted) {
-            this.configuration.setProperty(HAS_HALTED, hasHalted);
-            return this;
-        }
-
         public ConnectedComponentVertexProgram.Builder edges(final Traversal.Admin<Vertex, Edge> edgeTraversal) {
             PureTraversal.storeState(this.configuration, EDGE_TRAVERSAL, edgeTraversal);
             return this;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2cb1874/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java
index edeb497..c222cfa 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ConnectedComponentVertexProgramStep.java
@@ -29,11 +29,16 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Configuring;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.Serializer;
+
+import java.io.IOException;
+import java.util.Base64;
 
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
@@ -84,10 +89,25 @@ public final class ConnectedComponentVertexProgramStep extends VertexProgramStep
     public ConnectedComponentVertexProgram generateProgram(final Graph graph, final Memory memory) {
         final Traversal.Admin<Vertex, Edge> detachedTraversal = this.edgeTraversal.getPure();
         detachedTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass()));
-        return ConnectedComponentVertexProgram.build().
-                hasHalted(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS)).
+
+        final ConnectedComponentVertexProgram.Builder builder = ConnectedComponentVertexProgram.build().
                 edges(detachedTraversal).
-                property(this.clusterProperty).create(graph);
+                property(this.clusterProperty);
+
+        if (memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS)) {
+            final TraverserSet<?> haltedTraversers = memory.get(TraversalVertexProgram.HALTED_TRAVERSERS);
+            if (!haltedTraversers.isEmpty()) {
+                Object haltedTraversersValue;
+                try {
+                    haltedTraversersValue = Base64.getEncoder().encodeToString(Serializer.serializeObject(haltedTraversers));
+                } catch (final IOException ignored) {
+                    haltedTraversersValue = haltedTraversers;
+                }
+                builder.configure(TraversalVertexProgram.HALTED_TRAVERSERS, haltedTraversersValue);
+            }
+        }
+
+        return builder.create(graph);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2cb1874/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java
index 25e618a..8b1904f 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ConnectedComponentTest.java
@@ -67,7 +67,7 @@ public abstract class ConnectedComponentTest extends AbstractGremlinProcessTest
             switch (name) {
                 case "lop":
                 case "ripple":
-                    assertEquals("3", vertex.value(ConnectedComponentVertexProgram.COMPONENT));
+                    assertEquals("1", vertex.value(ConnectedComponentVertexProgram.COMPONENT));
                     break;
             }
             counter++;